@giridhara Spark has support for reading hive: https://spark.apache.org/docs/2.1.1/sql-programming-guide.html#hive-tables . The issue would be that you need SnappySession for column tables but using hive-metastore (enableHiveSupport) would lead to it being used for column tables too (rather than the inbuilt meta-store). So the best option for now is to take the RDD[Row] from Dataset, then insert that into column table. Something like:
val ds = spark.table("hiveTable")
val rdd = ds.rdd
val session = new SnappySession(sparkContext)
val df = session.createDataFrame(rdd, ds.schema)
df.write.format("column").saveAsTable("columnTable")
Can be made more efficient by using RDD[InternalRow] to avoid conversions from InternalRow to Row and back using package-private APIs if you have your classes (or helper methods) in org.apache.spark.sql package.
@sumwale hi, we got exception on data server as below and its status is always waiting.
2017-08-31 15:19:17,094 DEBUG [pool-3-thread-47]: server.SnappyThriftServerThreadPool (SnappyThriftServerThreadPool.java:run(276)) - Thrift error occurred during processing of message.
org.apache.thrift.transport.TTransportException: Channel closed.
at io.snappydata.thrift.common.SnappyTSocket.read(SnappyTSocket.java:373)
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
at org.apache.thrift.protocol.TCompactProtocol.readByte(TCompactProtocol.java:634)
at org.apache.thrift.protocol.TCompactProtocol.readMessageBegin(TCompactProtocol.java:501)
at io.snappydata.thrift.server.SnappyDataServiceImpl$Processor.process(SnappyDataServiceImpl.java:196)
at io.snappydata.thrift.server.SnappyThriftServerThreadPool$WorkerProcess.run(SnappyThriftServerThreadPool.java:270)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
__PRLS: 0 tokens, 0 locks held
gfxd-ddl-lock-service: 0 tokens, 0 locks held
TX states:
Full Thread Dump:
"SIGURG handler" Id=73 RUNNABLE
at sun.management.ThreadImpl.dumpThreads0(Native Method)
at sun.management.ThreadImpl.dumpAllThreads(ThreadImpl.java:454)
at com.pivotal.gemfirexd.internal.engine.locks.GfxdLocalLockService.generateThreadDump(GfxdLocalLockService.java:384)
at com.pivotal.gemfirexd.internal.engine.locks.GfxdLocalLockService.dumpAllRWLocks(GfxdLocalLockService.java:373)
at com.pivotal.gemfirexd.internal.engine.locks.GfxdDRWLockService.dumpAllRWLocks(GfxdDRWLockService.java:750)
at com.pivotal.gemfirexd.internal.engine.distributed.utils.GemFireXDUtils.dumpStacks(GemFireXDUtils.java:2843)
at com.pivotal.gemfirexd.internal.engine.SigThreadDumpHandler.handle(SigThreadDumpHandler.java:107)
at sun.misc.Signal$1.run(Signal.java:212)
at java.lang.Thread.run(Thread.java:745)
"Attach Listener" Id=72 RUNNABLE
"Idle OplogCompactor" Id=71 TIMED_WAITING on [I@7a8b28f8
at java.lang.Object.wait(Native Method)
- waiting on [I@7a8b28f8
at com.gemstone.gemfire.internal.cache.DiskStoreImpl.waitForIndexRecovery(DiskStoreImpl.java:5214)
at com.gemstone.gemfire.internal.cache.DiskStoreImpl.waitForIndexRecoveryEnd(DiskStoreImpl.java:5190)
at com.gemstone.gemfire.internal.cache.DiskStoreImpl$ValueRecoveryTask.run(DiskStoreImpl.java:4841)
- locked java.lang.Object@7eb4a3f6
at com.gemstone.gemfire.internal.cache.DiskStoreImpl$2.run(DiskStoreImpl.java:4985)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Number of locked synchronizers = 1
- java.util.concurrent.ThreadPoolExecutor$Worker@4ec02ecf
"Pooled Waiting Message Processor 1" Id=70 TIMED_WAITING on java.lang.Object@3f89a9cd
at java.lang.Object.wait(Native Method)
- waiting on java.lang.Object@3f89a9cd
at com.pivotal.gemfirexd.internal.engine.store.GemFireStore.waitForIndexLoadBegin(GemFireStore.java:2923)
at com.pivotal.gemfirexd.internal.engine.store.RegionEntryUtils$1.waitForAsyncIndexRecovery(RegionEntryUtils.java:744)
at com.gemstone.gemfire.internal.cache.DiskStoreImpl$IndexRecoveryTask.run(DiskStoreImpl.java:4710)
at com.gemstone.gemfire.internal.cache.DiskStoreImpl$2.run(DiskStoreImpl.java:4985)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at com.gemstone.gemfire.distributed.internal.DistributionManager.runUntilShutdown(DistributionManager.java:730)
at com.gemstone.gemfire.distributed.internal.DistributionManager$7$1.run(DistributionManager.java:1091)
at java.lang.Thread.run(Thread.java:745)
Number of locked synchronizers = 1
- java.util.concurrent.ThreadPoolExecutor$Worker@303269b4
"Asynchronous disk writer for region GFXD-DD-DISKSTORE" Id=68 TIMED_WAITING on java.lang.Object@558cc2b7
at java.lang.Object.wait(Native Method)
- waiting on java.lang.Object@558cc2b7
at java.lang.Object.wait(Object.java:460)
at java.util.concurrent.TimeUnit.timedWait(TimeUnit.java:348)
at com.gemstone.gemfire.internal.cache.DiskStoreImpl$FlusherThread.waitUntilFlushIsReady(DiskStoreImpl.java:1802)
at com.gemstone.gemfire.internal.cache.DiskStoreImpl$FlusherThread.run(DiskStoreImpl.java:1837)
at java.lang.Thread.run(Thread.java:745)
"Idle OplogCompactor" Id=67 TIMED_WAITING on [I@2d13452c
at java.lang.Object.wait(Native Method)
- waiting on [I@
from pyspark.sql.snappy import SnappySession
snappy = SnappySession(sc)
---------------------------------------------------------------------------
ModuleNotFoundError Traceback (most recent call last)
<ipython-input-7-b46794f153bf> in <module>()
----> 1 from pyspark.sql.snappy import SnappySession
2 snappy = SnappySession(op.sc)
ModuleNotFoundError: No module named 'pyspark.sql.snappy'
Is there a pip or something I am missing?