These are chat archives for cloudera/kudu

20th
Jun 2018
Ricardo Gaspar
@ricardogaspar2
Jun 20 2018 09:17
thanks @danburkert ;)
Ricardo Gaspar
@ricardogaspar2
Jun 20 2018 09:28

Hi, I'm having difficulty using a Kudu predicate and a lowerBound in a kudu scanner.
My goal is to get a batch of rows (after applying a predicate ) and storing in a Set(LinkedHashSet) to return to my program which will then process them.
For this, in each call to my method I give the last record I've read and I use the lowerBound function.

 scannerUpdated = kc.newScannerBuilder(table).addPredicate(predicate).limit(recordsToRead).lowerBound(lastRecordPartialRow).build();

From my execution logs, I've seen that for a table with 7000 rows (all satisfy the predicate) using a batch of 1000 I sometimes get less of a 1000 records/rows. And also the most intriguing thing is that sometimes: the last record (the last record present in the last set requested to the calling class) is not the same as the first record returned by the RowResult

How I'm doing it :
 private Set<? extends Record> getRecentlyUpdatedRecords(String tableName, KuduPredicate predicate,
                                                            int recordsToRead, Record lastRecord,
                                                            PartialRow lastRecordPartialRow) throws
            InvalidRecordException,
            IOException {
        long start = System.currentTimeMillis();

        Set<Record> recordsSet = new LinkedHashSet<>(recordsToRead);

        try {
            if (kc.tableExists(fullTableName(tableName))) {
                KuduTable table = kc.openTable(fullTableName(tableName));

                KuduScanner scannerUpdated;

                if (lastRecord != null) {
                    // start from the last record processed
                    scannerUpdated =
                            kc.newScannerBuilder(table).addPredicate(predicate).limit(recordsToRead).lowerBound(
                                    lastRecordPartialRow).build();
                } else {
                    scannerUpdated = kc.newScannerBuilder(table).addPredicate(predicate).limit(recordsToRead).build();
                }
                LOGGER.info("Kudu Scanner read rows limit: " + scannerUpdated.getLimit());


                int rowCounter = 0;
                while (scannerUpdated.hasMoreRows() && rowCounter < recordsToRead) {
                    RowResultIterator results = scannerUpdated.nextRows();
                    while (results != null && results.hasNext() && rowCounter < recordsToRead) {
                        RowResult rowData = results.next();
                        Record record = createRecordFromRow(rowData, tableName);

                        if (lastRecord != null && lastRecord.equals(record)) {
                            LOGGER.debug("Skiping current record (previously processed): " + record.toString() + " lastRecord: " +
                                    lastRecord.toString());
                        } else {
                            recordsSet.add(record);
                            LOGGER.debug("record added: " + record.toString());
                            rowCounter++;
                        }
                    }
                }

            }
        } catch (KuduException e) {
            closeSession();
            e.printStackTrace();
            throw new IOException(e);
        } finally {
            String methodName = new Object() {
            }.getClass().getEnclosingMethod().getName();
            LOGGER.info("KUDU " + methodName + " run time = " + (System.currentTimeMillis() - start) + " ms");
        }

        return recordsSet;
    }
Ricardo Gaspar
@ricardogaspar2
Jun 20 2018 09:37
It seems to me that the lowerBound is being applyed before the predicate. Is it?
Is there any way I can specify the order the restrictions are being applied (predicate before the lowerBound)?