Reading Data with Arrow FlightΒΆ

Client programs that read tables using Arrow Flight RPC typically follow the below recipe:

  1. Get a shared_ptr to a arrow::flight::FlightStreamReader via TableHandle::getFlightStreamReader

  2. Read the data using operations as described in Arrow Flight RPC

Consider the following program from cpp-examples/read_table_with_arrow_flight:

#include <iostream>
#include "deephaven/client/highlevel/client.h"
#include "deephaven/client/utility/table_maker.h"
#include "deephaven/client/utility/utility.h"

using deephaven::client::NumCol;
using deephaven::client::Client;
using deephaven::client::TableHandle;
using deephaven::client::TableHandleManager;
using deephaven::client::utility::flight::statusOrDie;
using deephaven::client::utility::TableMaker;

TableHandle makeTable(const TableHandleManager &manager) {
  TableMaker tm;
  std::vector<std::string> symbols{"FB", "AAPL", "NFLX", "GOOG"};
  std::vector<double> prices{101.1, 102.2, 103.3, 104.4};
  tm.addColumn("Symbol", symbols);
  tm.addColumn("Price", prices);
  return tm.makeTable(manager, "myTable");
}

void dumpSymbolColumn(const TableHandle &tableHandle) {
  auto fsr = tableHandle.getFlightStreamReader();
  while (true) {
    arrow::flight::FlightStreamChunk chunk;
    statusOrDie(fsr->Next(&chunk), "FlightStreamReader::Next()");
    if (chunk.data == nullptr) {
      break;
    }

    auto symbolChunk = chunk.data->GetColumnByName("Symbol");
    if (symbolChunk == nullptr) {
      throw std::runtime_error("Symbol column not found");
    }
    auto priceChunk = chunk.data->GetColumnByName("Price");
    if (priceChunk == nullptr) {
      throw std::runtime_error("Price column not found");
    }

    auto symbolAsStringArray = std::dynamic_pointer_cast<arrow::StringArray>(symbolChunk);
    auto priceAsDoubleArray = std::dynamic_pointer_cast<arrow::DoubleArray>(priceChunk);
    if (symbolAsStringArray == nullptr) {
      throw std::runtime_error("symbolChunk was not an arrow::StringArray");
    }
    if (priceAsDoubleArray == nullptr) {
      throw std::runtime_error("priceChunk was not an arrow::DoubleArray");
    }

    if (symbolAsStringArray->length() != priceAsDoubleArray->length()) {
      throw std::runtime_error("Lengths differ");
    }

    for (int64_t i = 0; i < symbolAsStringArray->length(); ++i) {
      auto symbol = symbolAsStringArray->GetView(i);
      auto price = priceAsDoubleArray->Value(i);
      std::cout << symbol << ' ' << price << '\n';
    }
  }
}

int main() {
  const char *server = "localhost:10000";
  auto client = Client::connect(server);
  auto manager = client.getManager();

  try {
    auto table = MakeTable(manager);
    DumpSymbolColumn(table);
  } catch (const std::runtime_error &e) {
    std::cerr << "Caught exception: " << e.what() << '\n';
  }
}