239 lines
5.6 KiB
C
239 lines
5.6 KiB
C
/*
|
|
* libhdfs engine
|
|
*
|
|
* this engine helps perform read/write operations on hdfs cluster using
|
|
* libhdfs. hdfs doesnot support modification of data once file is created.
|
|
*
|
|
* so to mimic that create many files of small size (e.g 256k), and this
|
|
* engine select a file based on the offset generated by fio.
|
|
*
|
|
* thus, random reads and writes can also be achieved with this logic.
|
|
*
|
|
* NOTE: please set environment variables FIO_HDFS_BS and FIO_HDFS_FCOUNT
|
|
* to appropriate value to work this engine properly
|
|
*
|
|
*/
|
|
|
|
#include <stdio.h>
|
|
#include <stdlib.h>
|
|
#include <unistd.h>
|
|
#include <sys/uio.h>
|
|
#include <errno.h>
|
|
#include <assert.h>
|
|
|
|
#include "../fio.h"
|
|
|
|
#include "hdfs.h"
|
|
|
|
struct hdfsio_data {
|
|
char host[256];
|
|
int port;
|
|
hdfsFS fs;
|
|
hdfsFile fp;
|
|
unsigned long fsbs;
|
|
unsigned long fscount;
|
|
unsigned long curr_file_id;
|
|
unsigned int numjobs;
|
|
unsigned int fid_correction;
|
|
};
|
|
|
|
static int fio_hdfsio_setup_fs_params(struct hdfsio_data *hd)
|
|
{
|
|
/* make sure that hdfsConnect is invoked before executing this function */
|
|
hdfsSetWorkingDirectory(hd->fs, "/.perftest");
|
|
hd->fp = hdfsOpenFile(hd->fs, ".fcount", O_RDONLY, 0, 0, 0);
|
|
if (hd->fp) {
|
|
hdfsRead(hd->fs, hd->fp, &(hd->fscount), sizeof(hd->fscount));
|
|
hdfsCloseFile(hd->fs, hd->fp);
|
|
}
|
|
hd->fp = hdfsOpenFile(hd->fs, ".fbs", O_RDONLY, 0, 0, 0);
|
|
if (hd->fp) {
|
|
hdfsRead(hd->fs, hd->fp, &(hd->fsbs), sizeof(hd->fsbs));
|
|
hdfsCloseFile(hd->fs, hd->fp);
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
static int fio_hdfsio_prep(struct thread_data *td, struct io_u *io_u)
|
|
{
|
|
struct hdfsio_data *hd;
|
|
hdfsFileInfo *fi;
|
|
unsigned long f_id;
|
|
char fname[80];
|
|
int open_flags = 0;
|
|
|
|
hd = td->io_ops->data;
|
|
|
|
if (hd->curr_file_id == -1) {
|
|
/* see comment in fio_hdfsio_setup() function */
|
|
fio_hdfsio_setup_fs_params(hd);
|
|
}
|
|
|
|
/* find out file id based on the offset generated by fio */
|
|
f_id = (io_u->offset / hd->fsbs) + hd->fid_correction;
|
|
|
|
if (f_id == hd->curr_file_id) {
|
|
/* file is already open */
|
|
return 0;
|
|
}
|
|
|
|
if (hd->curr_file_id != -1) {
|
|
hdfsCloseFile(hd->fs, hd->fp);
|
|
}
|
|
|
|
if (io_u->ddir == DDIR_READ) {
|
|
open_flags = O_RDONLY;
|
|
} else if (io_u->ddir == DDIR_WRITE) {
|
|
open_flags = O_WRONLY;
|
|
} else {
|
|
log_err("hdfs: Invalid I/O Operation\n");
|
|
}
|
|
|
|
hd->curr_file_id = f_id;
|
|
do {
|
|
sprintf(fname, ".f%lu", f_id);
|
|
fi = hdfsGetPathInfo(hd->fs, fname);
|
|
if (fi->mSize >= hd->fsbs || io_u->ddir == DDIR_WRITE) {
|
|
/* file has enough data to read OR file is opened in write mode */
|
|
hd->fp =
|
|
hdfsOpenFile(hd->fs, fname, open_flags, 0, 0,
|
|
hd->fsbs);
|
|
if (hd->fp) {
|
|
break;
|
|
}
|
|
}
|
|
/* file is empty, so try next file for reading */
|
|
f_id = (f_id + 1) % hd->fscount;
|
|
} while (1);
|
|
|
|
return 0;
|
|
}
|
|
|
|
static int fio_io_end(struct thread_data *td, struct io_u *io_u, int ret)
|
|
{
|
|
if (ret != (int)io_u->xfer_buflen) {
|
|
if (ret >= 0) {
|
|
io_u->resid = io_u->xfer_buflen - ret;
|
|
io_u->error = 0;
|
|
return FIO_Q_COMPLETED;
|
|
} else
|
|
io_u->error = errno;
|
|
}
|
|
|
|
if (io_u->error)
|
|
td_verror(td, io_u->error, "xfer");
|
|
|
|
return FIO_Q_COMPLETED;
|
|
}
|
|
|
|
static int fio_hdfsio_queue(struct thread_data *td, struct io_u *io_u)
|
|
{
|
|
struct hdfsio_data *hd;
|
|
int ret = 0;
|
|
|
|
hd = td->io_ops->data;
|
|
|
|
if (io_u->ddir == DDIR_READ) {
|
|
ret =
|
|
hdfsRead(hd->fs, hd->fp, io_u->xfer_buf, io_u->xfer_buflen);
|
|
} else if (io_u->ddir == DDIR_WRITE) {
|
|
ret =
|
|
hdfsWrite(hd->fs, hd->fp, io_u->xfer_buf,
|
|
io_u->xfer_buflen);
|
|
} else {
|
|
log_err("hdfs: Invalid I/O Operation\n");
|
|
}
|
|
|
|
return fio_io_end(td, io_u, ret);
|
|
}
|
|
|
|
int fio_hdfsio_open_file(struct thread_data *td, struct fio_file *f)
|
|
{
|
|
struct hdfsio_data *hd;
|
|
|
|
hd = td->io_ops->data;
|
|
hd->fs = hdfsConnect(hd->host, hd->port);
|
|
hdfsSetWorkingDirectory(hd->fs, "/.perftest");
|
|
hd->fid_correction = (getpid() % hd->numjobs);
|
|
|
|
return 0;
|
|
}
|
|
|
|
int fio_hdfsio_close_file(struct thread_data *td, struct fio_file *f)
|
|
{
|
|
struct hdfsio_data *hd;
|
|
|
|
hd = td->io_ops->data;
|
|
hdfsDisconnect(hd->fs);
|
|
|
|
return 0;
|
|
}
|
|
|
|
static int fio_hdfsio_setup(struct thread_data *td)
|
|
{
|
|
struct hdfsio_data *hd;
|
|
struct fio_file *f;
|
|
static unsigned int numjobs = 1; /* atleast one job has to be there! */
|
|
numjobs = (td->o.numjobs > numjobs) ? td->o.numjobs : numjobs;
|
|
|
|
if (!td->io_ops->data) {
|
|
hd = malloc(sizeof(*hd));;
|
|
|
|
memset(hd, 0, sizeof(*hd));
|
|
td->io_ops->data = hd;
|
|
|
|
/* separate host and port from filename */
|
|
*(strchr(td->o.filename, ',')) = ' ';
|
|
sscanf(td->o.filename, "%s%d", hd->host, &(hd->port));
|
|
|
|
/* read fbs and fcount and based on that set f->real_file_size */
|
|
f = td->files[0];
|
|
#if 0
|
|
/* IMHO, this should be done here instead of fio_hdfsio_prep()
|
|
* but somehow calling it here doesn't seem to work,
|
|
* some problem with libhdfs that needs to be debugged */
|
|
hd->fs = hdfsConnect(hd->host, hd->port);
|
|
fio_hdfsio_setup_fs_params(hd);
|
|
hdfsDisconnect(hd->fs);
|
|
#else
|
|
/* so, as an alternate, using environment variables */
|
|
if (getenv("FIO_HDFS_FCOUNT") && getenv("FIO_HDFS_BS")) {
|
|
hd->fscount = atol(getenv("FIO_HDFS_FCOUNT"));
|
|
hd->fsbs = atol(getenv("FIO_HDFS_BS"));
|
|
} else {
|
|
log_err("FIO_HDFS_FCOUNT and/or FIO_HDFS_BS not set.\n");
|
|
return 1;
|
|
}
|
|
#endif
|
|
f->real_file_size = hd->fscount * hd->fsbs;
|
|
|
|
td->o.nr_files = 1;
|
|
hd->curr_file_id = -1;
|
|
hd->numjobs = numjobs;
|
|
fio_file_set_size_known(f);
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
static struct ioengine_ops ioengine_hdfs = {
|
|
.name = "libhdfs",
|
|
.version = FIO_IOOPS_VERSION,
|
|
.setup = fio_hdfsio_setup,
|
|
.prep = fio_hdfsio_prep,
|
|
.queue = fio_hdfsio_queue,
|
|
.open_file = fio_hdfsio_open_file,
|
|
.close_file = fio_hdfsio_close_file,
|
|
.flags = FIO_SYNCIO,
|
|
};
|
|
|
|
static void fio_init fio_hdfsio_register(void)
|
|
{
|
|
register_ioengine(&ioengine_hdfs);
|
|
}
|
|
|
|
static void fio_exit fio_hdfsio_unregister(void)
|
|
{
|
|
unregister_ioengine(&ioengine_hdfs);
|
|
}
|