Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
    kamisama-rney
    @kamisama-rney

    @sloshy Appears I found an issue, I haven't started debugging it since it's late but I got an exception from the listObjects call

    Exception in thread "main" java.lang.NullPointerException
        at com.rewardsnetwork.pureaws.s3.S3ObjectInfo$.fromS3Object(S3ObjectInfo.scala:35)
        at com.rewardsnetwork.pureaws.s3.S3ObjectOps$$anon$1.$anonfun$listObjectsPaginated$12(S3ObjectOps.scala:140)
        at scala.collection.immutable.List.map(List.scala:246)
        at com.rewardsnetwork.pureaws.s3.S3ObjectOps$$anon$1.$anonfun$listObjectsPaginated$10(S3ObjectOps.scala:140)

    The folder I'm pulling has 14,823 files in it. I decided to not try to make a Stream initially. I will say this project is really bringing this senior engineer down a few notches. I've included the entire POC class due to the exception.

    package aws.s3.reader
    
    import cats.effect._
    import com.rewardsnetwork.pureaws.s3.{PureS3Client, S3ObjectOps}
    import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration
    import software.amazon.awssdk.core.retry.RetryPolicy
    import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient
    import software.amazon.awssdk.regions.Region
    import software.amazon.awssdk.services.s3.S3AsyncClient
    
    import java.time.Duration
    
    class BackfillApp[F[_]: ConcurrentEffect: Sync: ContextShift] {
    
      def awsAsyncS3Client(): S3AsyncClient = {
        val region = Region.of("us-west-2")
        val clientConfiguration = ClientOverrideConfiguration
          .builder()
          .apiCallAttemptTimeout(Duration.ofMillis(30000))
          .retryPolicy(RetryPolicy.builder().numRetries(5).build())
          .apiCallTimeout(Duration.ofMillis(30000))
    
        val awsClient =
          S3AsyncClient
            .builder()
            .region(region)
            .httpClientBuilder(
              NettyNioAsyncHttpClient
                .builder()
                .maxConcurrency(100)
                .connectionTimeout(Duration.ofMillis(60000))
            )
            .credentialsProvider(
              software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider.create()
            )
            .overrideConfiguration(clientConfiguration.build())
            .build()
        awsClient
      }
    
      def processData(sourceBucket: String, prefix: String, server: String, index: String, parallelism: Int) = {
    
        val s3AsyncClient = Blocker[F].flatMap(Resource.fromAutoCloseableBlocking(_)(Sync[F].delay(awsAsyncS3Client())))
        val pureClient = s3AsyncClient.map(PureS3Client.apply[F])
        val bucketAndObjectOps = pureClient.map(p => S3ObjectOps(p))
    
        bucketAndObjectOps.use { objectOps =>
          val listings = objectOps.listObjects(bucket = sourceBucket, prefix = Some(prefix), requestPayer = None)
          listings
        }
      }
    }
    
    object Main extends IOApp {
    
      def run(args: List[String]): IO[ExitCode] = {
        val foo = new BackfillApp[IO]()
    
        val test = foo.processData("backfill-queue", "0", "", "", 100).unsafeRunSync()
        IO(ExitCode.Success) //.compile.last.map(_ => ExitCode.Success)
      }
    }

    Also question, shouldn't it be possible to call listings.flatMap on the listings member in the resource use block? It's a F[S3ObjectListing] and I want to create a fs2.Stream from the .objects member but I can't get to it.

    Gajendra Naidu Thalapaneni
    @GajendraNaidu
    It is throwing NPE, because of owner field is null in S3Object
    def fromS3Object(o: S3Object, bucket: String) =
        S3ObjectInfo(bucket, o.key, o.lastModified, o.eTag, o.owner.displayName, o.owner.id, o.size)
    kamisama-rney
    @kamisama-rney
    So if one didn't want to change the interface
      def fromS3Object(o: S3Object, bucket: String) = {
        val (displayName, ownerId) = Option(o.owner) match {
          case Some(owner) => owner.displayName -> owner.id
          case None => "" -> ""
        }
    
        S3ObjectInfo(bucket, o.key, o.lastModified, o.eTag, displayName, ownerId, o.size)
      }
    Ryan Peters
    @sloshy
    @kamisama-rney @GajendraNaidu ah damn, Java nulls strike again. I'll make a note of this and implement a fix when I have time. PRs are also very welcome
    kamisama-rney
    @kamisama-rney
    I was going to say, Java nulls are a classic interop
    Been caught more than once on that with consuming Java Protos
    Ryan Peters
    @sloshy
    Double-checking the API docs to see if/when this is supposed to be null, and I think the best option would be changing the interface
    This library is new enough and low-users-enough (probably even less that depend on the owner portion) that a version introducing a small breaking change is the best thing to do
    kamisama-rney
    @kamisama-rney
    Make owner a nested object so you can Option[Owner] for simplicity?
    Ryan Peters
    @sloshy
    yeah it'll likely just get wrapped in Option
    Owner owner() - The owner of the object
    kamisama-rney
    @kamisama-rney
    That's what I was thinking
    S3ObjectInfo(bucket, o.key, o.lastModified, o.eTag, Option(o.owner), o.size)
    Ryan Peters
    @sloshy
    looks like they don't say it's nullable
    Makes me wonder what else could secretly be nullable
    Ryan Peters
    @sloshy
    @kamisama-rney what does it look like for you, in S3, since owner is null? Is this for everything in the bucket or just some objects?
    kamisama-rney
    @kamisama-rney
    Also on another note, managed to finally solve my FlatMap ambiguous inclusion so I'm back to this one. I'm still trying to figure out what simple thing I'm missing. Shouldn't I have the ability to take the resulting collection List[S3ObjectInfo] returned in the result from listObjects and stuff it into a fs2.Stream like this?
      def processData(sourceBucket: String, prefix: String, server: String, index: String, parallelism: Int) = {
    
        val s3AsyncClient = Blocker[F].flatMap(Resource.fromAutoCloseableBlocking(_)(Sync[F].delay(awsAsyncS3Client())))
        val pureClient = s3AsyncClient.map(PureS3Client.apply[F])
        val bucketAndObjectOps = pureClient.map(p => S3ObjectOps(p))
    
        bucketAndObjectOps.use { objectOps =>
          val listings = objectOps.listObjects(bucket = sourceBucket, prefix = Some(prefix), requestPayer = None)
          fs2.Stream.emits(listings.flatMap(x => x.objects))
        }
      }
    Ryan Peters
    @sloshy
    Yes that should work, however, you're returning a Stream from usage of a Resource which might be the issue you are seeing
    If you call it like Stream.resource(bucketAndObjectOps).flatMap { objectOps => ... } instead of calling use, it should work.
    kamisama-rney
    @kamisama-rney
    Okay, that solved could not find implicit value for parameter F: cats.effect.BracketThrow[[x]Any]
    Ryan Peters
    @sloshy
    FS2, Monix, etc. all have the ability to subsume the scope of a resource, which means you can have automatic resource management as part of your stream lifetime. Considering you want to return the result stream, that seems like what you should do. Either that, or an F[Stream[F, S3Object]] which would then need to be evaluated by the caller (or before returning)
    kamisama-rney
    @kamisama-rney
    Had to change the emits to fs2.Stream.evals(listings.map(x => x.objects))
    Last error is
    type mismatch;
     found   : Unit
     required: F[?]
            .parEvalMap(parallelism)(loadBatch)
    Ryan Peters
    @sloshy
    Ok, so what is the return type of loadBatch there? It should be in F[Unit] shape, so if you're just doing a raw println then you need to wrap it with Sync[F].delay
    By the way in Cats Effect 3 (in release candidate stage) it will have a built-in Console[F] algebra for doing arbitrary println-type things
    But in CE2 there is also console4cats, or you can wrap your calls yourself as mentioned
    This library does not have a CE3 release yet but it will get one soon
    kamisama-rney
    @kamisama-rney
    Ah, I did a defer instead of a delay
    So now
      def processData(sourceBucket: String, prefix: String, server: String, index: String, parallelism: Int) = {
    
        val s3AsyncClient = Blocker[F].flatMap(Resource.fromAutoCloseableBlocking(_)(Sync[F].delay(awsAsyncS3Client())))
        val pureClient = s3AsyncClient.map(PureS3Client.apply[F])
        val bucketAndObjectOps = pureClient.map(p => S3ObjectOps(p))
    
        fs2.Stream.resource(bucketAndObjectOps).flatMap { objectOps =>
          val listings = objectOps.listObjects(bucket = sourceBucket, prefix = Some(prefix), requestPayer = None)
          fs2.Stream.evals(listings.map(x => x.objects))
            .parEvalMap(parallelism)(loadBatch)
        }
      }
    
      def loadBatch(file: S3ObjectInfo) = {
        Sync[F].delay(Console.println(file.key))
      }
    Ryan Peters
    @sloshy
    Can you see the inferred type of loadBatch just so we're sure? It should be F[Unit] at this point
    kamisama-rney
    @kamisama-rney
    It is
    Ryan Peters
    @sloshy
    Great
    kamisama-rney
    @kamisama-rney
    Might want to add something like this to the README.md or an examples project. When I come across new libraries I tend to like to grab a piece of working sample code and reverse engineer it.
    Ryan Peters
    @sloshy
    I plan to have some specific walkthrough examples once I get around to building up the docs
    I'm planning on making a microsite with compiled documentation that shows you the results
    Which is admittedly a little tricky w/ AWS since to get examples to compile and return results, you have to fake it or otherwise talk to AWS every time you build the site... Not going to do that.
    kamisama-rney
    @kamisama-rney
    I hear you there, the example is likely better in the README.md for cut-n-paste. I believe the fs2.kafka library is that way due to a similar issue
    Ryan Peters
    @sloshy
    Yeah, they have all their docs up on the microsite and nothing on github from what I see https://fd4s.github.io/fs2-kafka/docs/quick-example
    That's a pretty great library btw. Used it many a time
    kamisama-rney
    @kamisama-rney
    We use it as well, it was our first venture into FS2. Thus my frustration of having issues with the AWS one.
    We've been using the fs2-kafka for over a year so I was "I'll just whip up a quick S3 bucket crawler for backfilling an Elasticsearch" and got knocked down a few pegs
    Ryan Peters
    @sloshy
    @kamisama-rney By all means, if there is anything about the library UX that is confusing or warrants further instruction, I'm happy to help and build up docs for your case
    Fortunately in this case (besides the NPE) it seems like the issue was more lack of familiarity with FS2's core API, does that sound accurate?
    kamisama-rney
    @kamisama-rney
    Definitely much is my lack of wide ranging knowledge of FS2's core APIs. I would suspect I know enough to be dangerous to myself and small furry animals.
    Ryan Peters
    @sloshy
    In any case - I wrote this library while working for Rewards Network and while I still co-maintain it, I work as a consultant for 47 Degrees these days. If you or your company ever need any additional help w.r.t. functional programming in Scala (beyond the helping out on Gitter I do in my free time of course) you can reach me at ryan.peters@47deg.com
    I'll keep checking back in the meantime if you run into any other issues, or have any feature/bugfix requests
    I've already logged the NPE on github so either myself or @mrerrormessage will get to it, probably by the end of the weekend (hopefully today)
    kamisama-rney
    @kamisama-rney
    k, thanks. hopefully I've crossed the threshold into the area I'm more familiar with
    Ryan Peters
    @sloshy
    @kamisama-rney Thanks for the PR! I just left a single comment but overall it looks good
    Ryan Peters
    @sloshy
    @kamisama-rney thanks to your PR I put out a release tonight containing your fix! It should be propagating soon https://github.com/rewards-network/pure-aws/releases/tag/v0.5.0
    Thank you for your patience and let us know if you run into any further difficulties