Creating a table with Arrow FlightΒΆ
Client programs that create tables using Arrow Flight RPC typically follow the below recipe:
Get a
FlightWrapper
class viaTableHandleManager::createFlightWrapper
For calls like Arrow
DoPut
that take anarrow::Flight::FlightCallOptions
, endow that object with Deephaven authentication headers viaFlightWrapper::addAuthHeaders
Get a pointer to the
arrow::flight::FlightClient
fromFlightWrapper::flightClient
Then perform the operations as described in Arrow Flight RPC
Consider the following program from cpp-examples/create_table_with_arrow_flight
:
#include <iostream>
#include "deephaven/client/highlevel/client.h"
#include "deephaven/client/utility/table_maker.h"
using deephaven::client::NumCol;
using deephaven::client::Client;
using deephaven::client::TableHandle;
using deephaven::client::TableHandleManager;
using deephaven::client::utility::flight::statusOrDie;
using deephaven::client::utility::flight::valueOrDie;
using deephaven::client::utility::TableMaker;
// This example shows how to use the Arrow Flight client to make a simple table.
void doit(const TableHandleManager &manager) {
// 1. Build schema
arrow::SchemaBuilder schemaBuilder;
// 2. Add "Symbol" column (type: string) to schema
auto symbolMetadata = std::make_shared<arrow::KeyValueMetadata>();
statusOrDie(symbolMetadata->Set("deephaven:type", "java.lang.String"), "KeyValueMetadata::Set");
auto symbolField = std::make_shared<arrow::Field>("Symbol",
std::make_shared<arrow::StringType>(), true, std::move(symbolMetadata));
statusOrDie(schemaBuilder.AddField(symbolField), "SchemaBuilder::AddField");
// 3. Add "Price" column (type: double) to schema
auto priceMetadata = std::make_shared<arrow::KeyValueMetadata>();
statusOrDie(priceMetadata->Set("deephaven:type", "double"), "KeyValueMetadata::Set");
auto priceField = std::make_shared<arrow::Field>("Price",
std::make_shared<arrow::StringType>(), true, std::move(priceMetadata));
statusOrDie(schemaBuilder.AddField(priceField), "SchemaBuilder::AddField");
// 4. Schema is done
auto schema = valueOrDie(schemaBuilder.Finish(), "Failed to create schema");
// 5. Prepare symbol and price data
std::vector<std::string> symbols{"FB", "AAPL", "NFLX", "GOOG"};
std::vector<double> prices{101.1, 102.2, 103.3, 104.4};
auto numRows = static_cast<int64_t>(symbols.size());
if (numRows != prices.size()) {
throw std::runtime_error("sizes don't match");
}
// 6. Move data to Arrow column builders
arrow::StringBuilder symbolBuilder;
arrow::DoubleBuilder priceBuilder;
symbolBuilder.AppendValues(symbols);
priceBuilder.AppendValues(prices);
// 7. Get Arrow columns from builders
std::vector<std::shared_ptr<arrow::Array>> columns = {
valueOrDie(symbolBuilder.Finish(), "symbolBuilder.Finish()"),
valueOrDie(priceBuilder.Finish(), "priceBuilder.Finish()")
};
// 8. Get a Deephaven "FlightWrapper" object to access Arrow Flight
auto wrapper = manager.createFlightWrapper();
// 9. Allocate a TableHandle and get its corresponding Arrow flight descriptor
auto [table, fd] = manager.newTableHandleAndFlightDescriptor();
// 10. DoPut takes FlightCallOptions, which need to at least contain the Deephaven
// authentication headers for this session.
arrow::flight::FlightCallOptions options;
wrapper.addAuthHeaders(&options);
// 11. Perform the doPut
std::unique_ptr<arrow::flight::FlightStreamWriter> fsw;
std::unique_ptr<arrow::flight::FlightMetadataReader> fmr;
statusOrDie(wrapper.flightClient()->DoPut(options, fd, schema, &fsw, &fmr), "DoPut failed");
// 12. Make a RecordBatch containing both the schema and the data
auto batch = arrow::RecordBatch::Make(schema, numRows, std::move(columns));
statusOrDie(fsw->WriteRecordBatch(*batch), "WriteRecordBatch failed");
statusOrDie(fsw->DoneWriting(), "DoneWriting failed");
// 13. Read back a metadata message (ignored), then close the Writer
std::shared_ptr<arrow::Buffer> buf;
statusOrDie(fmr->ReadMetadata(&buf), "ReadMetadata failed");
statusOrDie(fsw->Close(), "Close failed");
// 14. Use Deephaven high level operations to fetch the table and print it
std::cout << "table is:\n" << table.stream(true) << std::endl;
}
int main() {
const char *server = "localhost:10000";
auto client = Client::connect(server);
auto manager = client.getManager();
try {
doit(manager);
} catch (const std::runtime_error &e) {
std::cerr << "Caught exception: " << e.what() << '\n';
}
}