I want to use personalize for my app recommendation model. To get my Current apps analytics data. I have connected pinpoint to get the data with the help of kinesis firehose as explain in this documentation.
But when I connected kinesis data firehose to pinpoint.
My pinpoint sends data to kinesis. But output is different what i want.
Kinesis Setting :
and Output i get.
Is there any other way to work around to send data to personalize from pinpoint to start the campaign. After campaign start i can send data through campaign according to documentation.
Since the shape and content of Pinpoint events are different than the format of interactions required by Personalize (either imported in bulk as an interactions CSV or incrementally via the PutEvents API), some transformation is going to be required to get these events into the right format. The solution you noted uses periodic bulk imports by using Athena to extract and format the event data saved in S3 (through Kinesis Firehose) into the CSV format expected by Personalize and then imported into Personalize. You can find the Athena named queries in the CloudFormation template for the solution here.
WITH evs AS (
SELECT
client.client_id as endpoint_id,
attributes.campaign_id as campaign_id,
event_type,
arrival_timestamp
FROM event
WHERE
(
${InteractionsQueryDateScope} > 0
AND arrival_timestamp >= date_add('day', -1, CURRENT_DATE)
AND arrival_timestamp < CURRENT_DATE
) OR (
${InteractionsQueryDateScope} = -1
)
AND
event_type != '_custom.recommender'
),
recs AS (
SELECT
attributes.personalize_user_id as personalize_user_id,
client.client_id as endpoint_id,
attributes.campaign_id as campaign_id,
attributes.item_id as item_id,
event_type,
arrival_timestamp
FROM event
WHERE
(
${InteractionsQueryDateScope} > 0
AND arrival_timestamp >= date_add('day', -1, CURRENT_DATE)
AND arrival_timestamp < CURRENT_DATE
) OR (
${InteractionsQueryDateScope} = -1
)
AND
event_type = '_custom.recommender'
)
SELECT
r.personalize_user_id as USER_ID,
r.item_id AS ITEM_ID,
b.event_type AS EVENT_TYPE,
v.EVENT_VALUE,
CAST(to_unixtime(b.arrival_timestamp) AS BIGINT) AS TIMESTAMP
FROM endpoint_export a
INNER JOIN recs r
ON a.id = r.endpoint_id
INNER JOIN evs b
ON a.id = b.endpoint_id AND r.campaign_id = b.campaign_id
INNER JOIN event_value v
ON b.event_type = v.event_type
Here are how the tables are created in the Glue data catalog.
CREATE EXTERNAL TABLE IF NOT EXISTS `${PinpointEventDatabase}`.event (
client struct<client_id:string>,
attributes struct<campaign_id:string, item_id:string, personalize_user_id:string>,
event_type string,
arrival_timestamp timestamp
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
'serialization.format' = '1'
) LOCATION 's3://${DataS3Bucket}/events/'
TBLPROPERTIES ('has_encrypted_data'='false');
CREATE EXTERNAL TABLE IF NOT EXISTS `${PinpointEventDatabase}`.endpoint_export (
id string,
channeltype string,
address string,
endpointstatus string,
optout string,
effectivedate string
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
'serialization.format' = '1'
) LOCATION 's3://${DataS3Bucket}/endpoint_exports/'
TBLPROPERTIES ('has_encrypted_data'='false');
CREATE EXTERNAL TABLE IF NOT EXISTS `${PinpointEventDatabase}`.event_value (
event_type string,
event_value double
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
'serialization.format' = ',',
'field.delim' = ','
) LOCATION 's3://${DataS3Bucket}/event_values/'
TBLPROPERTIES ('has_encrypted_data'='false', 'skip.header.line.count'='1');
An alternative approach to Kinesis Firehose/S3/Athena would be to write a Lambda function that consumed Pinpoint events directly from the Kinesis Data Stream, transform the Pinpoint event in the Lambda into a PutEvents API call to a Personalize event tracker, and then create a solution, solution version, and campaign once you have accumulated enough interaction data.
The minimal fields you need from the Pinpoint events for Personalize are USER_ID
, ITEM_ID
, and TIMESTAMP
. The USER_ID
will most likely be the Pinpoint endpoint (client.client_id
), the ITEM_ID
will likely be passed through Pinpoint as an attribute (attributes.item_id
) and TIMESTAMP
will be arrival_timestamp
. You can also use the Pinpoint event_type
as an EVENT_TYPE
in Personalize. The SQL statement above shows you how this is done using Athena but you can also do this in code in the Lambda for each mini-batch of events that you consume off the Kinesis data stream.