I'm trying to learn how to use Alpakka and have setup a test to write a document to Elastic. From reading docs, including https://doc.akka.io/docs/alpakka/current/elasticsearch.html have written the following :
import akka.actor.ActorSystem
import akka.stream.alpakka.elasticsearch.scaladsl.ElasticsearchSink
import akka.stream.alpakka.elasticsearch._
import akka.stream.scaladsl.Source
import spray.json.DefaultJsonProtocol._
import spray.json.{JsonFormat, _}
object AlpakkaWrite extends App{
case class VolResult(symbol : String, vol : Double, timestamp : Long)
implicit val actorSystem = ActorSystem()
val connectionString = "****";
val userName = "****"
val password = "****"
def constructElasticsearchParams(indexName: String, typeName: String, apiVersion: ApiVersion) =
if (apiVersion eq ApiVersion.V5)
ElasticsearchParams.V5(indexName, typeName)
else if (apiVersion eq ApiVersion.V7)
ElasticsearchParams.V7(indexName)
else
throw new IllegalArgumentException("API version " + apiVersion + " is not supported")
val connectionSettings = ElasticsearchConnectionSettings
.create(connectionString).withCredentials(userName, password)
val sinkSettings =
ElasticsearchWriteSettings.create(connectionSettings).withApiVersion(ApiVersion.V7);
implicit val formatVersionTestDoc: JsonFormat[VolResult] = jsonFormat3(VolResult)
Source(List(VolResult("test" , 1 , System.currentTimeMillis())))
.map { message: VolResult =>
WriteMessage.createIndexMessage("00002", message )
}
.log(("Error"))
.runWith(
ElasticsearchSink.create[VolResult](
constructElasticsearchParams("ccy_vol_normalized", "_doc", ApiVersion.V7),
settings = sinkSettings
)
)
}
Outputs :
19:15:51.815 [default-akka.actor.default-dispatcher-5] INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started
19:15:52.547 [default-akka.actor.default-dispatcher-5] ERROR akka.stream.alpakka.elasticsearch.impl.ElasticsearchSimpleFlowStage$StageLogic - Received error from elastic after having already processed 0 documents. Error: java.lang.RuntimeException: Request failed for POST /_bulk
Have I defined the case class DataPayload correctly ? It does match the expected payload defined in the index mapping ? :
"properties": {
"timestamp": { "type": "date",
"format": "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"
},
"vol": { "type": "float" },
"symbol": { "type": "text" }
}
Using Elastic dev tools the following command will insert a document successfully :
POST ccy_vol_normalized/_doc/
{
"timestamp": "2022-10-21T00:00:00.000Z",
"vol": 1.221,
"symbol" : "SYM"
}
This works :
import akka.actor.ActorSystem
import akka.stream.alpakka.elasticsearch._
import akka.stream.alpakka.elasticsearch.scaladsl.ElasticsearchSink
import akka.stream.scaladsl.Source
import spray.json.DefaultJsonProtocol._
import spray.json.JsonFormat
import java.text.SimpleDateFormat
import java.util.Date
object AlpakkaWrite extends App {
val connectionString = "";
implicit val actorSystem = ActorSystem()
val userName = ""
val password = ""
val connectionSettings = ElasticsearchConnectionSettings
.create(connectionString).withCredentials(userName, password)
val sinkSettings =
ElasticsearchWriteSettings.create(connectionSettings).withApiVersion(ApiVersion.V7);
val HOUR = 1000 * 60 * 60
val utcDate = new Date(System.currentTimeMillis() - HOUR)
val ts = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS").format(utcDate) + "Z"
implicit val formatVersionTestDoc: JsonFormat[VolResult] = jsonFormat3(VolResult)
def constructElasticsearchParams(indexName: String, typeName: String, apiVersion: ApiVersion) =
if (apiVersion eq ApiVersion.V5)
ElasticsearchParams.V5(indexName, typeName)
else if (apiVersion eq ApiVersion.V7)
ElasticsearchParams.V7(indexName)
else
throw new IllegalArgumentException("API version " + apiVersion + " is not supported")
case class VolResult(symbol: String, vol: Double, timestamp: String)
println("ts : " + ts)
Source(List(VolResult("test1", 1, ts)))
.map { message: VolResult =>
WriteMessage.createIndexMessage(System.currentTimeMillis().toString, message)
}
.log(("Error"))
.runWith(
ElasticsearchSink.create[VolResult](
constructElasticsearchParams("ccy_vol_normalized", "_doc", ApiVersion.V7),
settings = sinkSettings
)
)
}
My date format was incorrect, using :
val HOUR = 1000 * 60 * 60
val utcDate = new Date(System.currentTimeMillis() - HOUR)
val ts = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS").format(utcDate) + "Z"
fixed the issue.