Need an help with implementation of ZIO-GRPC

Also refer this stack overflow: https://stackoverflow.com/questions/79426489/need-help-implementing-grpc-pub-sub-using-zio I am able to connect GRPC server using scala and fetchResponse by calling the subscribe api using grpc StreamObserver. With the same configuration I tried using ZIO framework I am not reciving the response. I have below configuration host: api.salesforce.pubsub.com port : 7443 tenantId : "xxx" accessToken : "yyy" instanceUrl : "https://zzz.com" I tried with this configuration to build ManagedChannel and call subscribe Api by passing FetchRequest(topicName) i am successfully getting the FetchResponse using StreamObserver. With the same configuration I am not able to receive the FetchResponse and not even getting an error. Below is the ZIO-GRPC implementation. object Main extends ZIOAppDefault{ def managedChannel():ZManagedChannel={ val managedChannelBuilder: ManagedChannelBuilder[_] = ManagedChannelBuilder.forAddress(host,port) val interceptor = Seq(ZClientInterceptor.headersReplacer{ (_,_)=> SafeMetadata.make( ("tenantid",tenantId), ("accesstoken",accessToken), ("instanceurl",URI(instanceUrl).resolve("/").toString)) }) ZManagedChannel(managedChannelBuilder, interceptor) } def requestBody() ={ val fetchRequest: zio.stream.Stream[StatusException,FetchRequest]=ZStream.succeed(FetchRequest(topicName= "topic", replayPresent = EARLIEST)) fetchRequest } def subcribeAndFetch():ZStream[PubSubClient, StatusException,Option[FetchResponse]]= for{ _ ZStream.logError(s"the subscribe request failed: ${ex.getStatus}").as(None)) } yield response override def run ={ subcribeAndFetch .runDrain .repeat(Schedule.spaced(2.second)) .provideLayer(PubSubClient.live(managedChannel())) } }

Feb 14, 2025 - 21:34
 0
Need an help with implementation of ZIO-GRPC

Also refer this stack overflow: https://stackoverflow.com/questions/79426489/need-help-implementing-grpc-pub-sub-using-zio
I am able to connect GRPC server using scala and fetchResponse by calling the subscribe api using grpc StreamObserver. With the same configuration I tried using ZIO framework I am not reciving the response.

I have below configuration host: api.salesforce.pubsub.com port : 7443 tenantId : "xxx" accessToken : "yyy" instanceUrl : "https://zzz.com"

I tried with this configuration to build ManagedChannel and call subscribe Api by passing FetchRequest(topicName) i am successfully getting the FetchResponse using StreamObserver.

With the same configuration I am not able to receive the FetchResponse and not even getting an error. Below is the ZIO-GRPC implementation.

object Main extends ZIOAppDefault{
def managedChannel():ZManagedChannel={
val managedChannelBuilder: ManagedChannelBuilder[_] = ManagedChannelBuilder.forAddress(host,port)

val interceptor = Seq(ZClientInterceptor.headersReplacer{
  (_,_)=> SafeMetadata.make(
       ("tenantid",tenantId),
       ("accesstoken",accessToken),
       ("instanceurl",URI(instanceUrl).resolve("/").toString))
  })

ZManagedChannel(managedChannelBuilder, interceptor)
}

def requestBody() ={
val fetchRequest: zio.stream.Stream[StatusException,FetchRequest]=ZStream.succeed(FetchRequest(topicName= "topic", replayPresent = EARLIEST))

fetchRequest

}

def subcribeAndFetch():ZStream[PubSubClient, StatusException,Option[FetchResponse]]=
for{
_ <- ZStream.logInfo(s"fetching..")
response <- PubSubClient.subscribe(requestBody()).map(response => Some(response))
.catchAll(ex => ZStream.logError(s"the subscribe request failed: ${ex.getStatus}").as(None))
} yield response

override def run ={
subcribeAndFetch
.runDrain
.repeat(Schedule.spaced(2.second))
.provideLayer(PubSubClient.live(managedChannel()))
}
}