Migrate Data from MySQL to DynamoDB
MigrateDatafromMySQLtoDynamoDB
DirectlywritestotheDynamoDB
https://github.com/audienceproject/spark-dynamodb
Iwasthinkingthisshouldwork,butitdoesnotworkingatreading
%spark.dep
z.load("mysql:mysql-connector-java:5.1.47")
z.load("com.github.traviscrawford:spark-dynamodb:0.0.13")
z.load("com.audienceproject:spark-dynamodb_2.11:0.4.1")
Thisreadingdoesnotwork
importcom.audienceproject.spark.dynamodb.implicits._
valaccountDF=spark.read.option("region","us-west-1").dynamodb("account-int-accounts")
accountDF.printSchema()
accountDF.show(2)
Thisreadingwork
importcom.github.traviscrawford.spark.dynamodb._
valaccountDF=sqlContext.read.dynamodb("us-west-1","account-int-accounts")
accountDF.printSchema()
accountDF.show(1)
Thisisworkingforwritingdata,butIdonotthinkitworkswellwiththecapacity
%spark.dep
z.load("mysql:mysql-connector-java:5.1.47")
z.load("com.github.traviscrawford:spark-dynamodb:0.0.13")
z.load("com.audienceproject:spark-dynamodb_2.11:0.4.1")
z.load("com.google.guava:guava:14.0.1")
importcom.github.traviscrawford.spark.dynamodb._
valaccountDF=sqlContext.read.dynamodb("us-west-1","account-int-accounts")
accountDF.printSchema()
accountDF.show(1)
importcom.audienceproject.spark.dynamodb.implicits._
accountDF.write.option("region","us-west-1").dynamodb("account-int-accounts2")
TheReadWorksasWell
importcom.audienceproject.spark.dynamodb.implicits._
valdynamoDF=spark.read.option("region","us-west-1").dynamodb("account-int-accounts")
dynamoDF.printSchema()
dynamoDF.show(5)
DynamoDBFormatandAWSCommand
https://github.com/lmammino/json-dynamo-putrequest
Firstofall,preparetheJSONfileontheserver,usuallyIwilldownloadthat
>hdfsdfs-gethdfs://localhost:9000/mysqltodynamodb/account2./account2
FindtheJSONfileaccount2.json
InstallNodeJSifitisnotonthesystem
>sudoaptinstallnodejs
>sudoaptinstallnpm
>node--version&&npm--version
v8.10.0
3.5.2
Installthesoftware
>sudonpminstall--globaljson-dynamo-putrequest
Checkinstallation
>json-dynamo-putrequest--help
>json-dynamo-putrequest--version
1.0.0
Command
>json-dynamo-putrequestaccount-int-accounts2--outputaccount-dynamo.json<account2.json
Error:Inputdataneedstobeanarray
Add[and],replace}to},trythedataagain.
>json-dynamo-putrequestaccount-int-accounts2--outputaccount-dynamo.json<account2.json
Outputsavedin/home/ubuntu/data/account-dynamo.json
Fileisreadyasaccount-dynamo.json
https://github.com/lmammino/json-dynamo-putrequest
ThenfollowdocumentstoimportdataintoTable
https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/SampleData.LoadData.html
Createatablefromtheconsole,executetheimportcommand
>awsdynamodbbatch-write-item--request-itemsfile:///home/ubuntu/data/account-dynamo.json
at'requestItems'failedtosatisfyconstraint:Mapvaluemustsatisfyconstraint:[Membermusthavelengthlessthanorequalto25,Membermusthavelengthgreaterthanorequalto1]
Haha,inthedocs,allthesamplearelessthan25items.
DirectlywriteNodeJStoparsetheJSONfileanddotheimportworks
"usestrict";
//Howtorun
//nodedynamodb-scripts/import-devices-to-dynamodb.js{ENV}./css_devices_only_csv.txt
//eg:nodedynamodb-scripts/import-devices-to-dynamodb.jsint./css_devices_only_csv.txt
varimportDevicesToDynamo;
process.env.AWS_SDK_LOAD_CONFIG=true;
(function(importDevicesToDynamo){
constfs=require('fs');
constbabyparse=require("babyparse");
constAWS=require("aws-sdk");
constlog4js=require('log4js');
constlogger=log4js.getLogger();
constsleep=require('sleep');
constenv=process.argv[2];//Mustbeint,stageorprod
constcsvFilePath=process.argv[3];
constconfig={
delimiter:',',
newline:"",
quoteChar:'"',
header:true,
dynamicTyping:false,
preview:0,
encoding:"utf8",
worker:false,
comments:false,
skipEmptyLines:true
};
lettableName=`lifesize_device-${env}-devicePairingInfo`;
letaccessKey="";
letsignatureKey="";
letregion="";
letdynamoDbUrl="";
//validateparameters
if(!env){
console.log("\nMustpassinenvironmentfor1stargument.Mustbeoneof'int,'stage'or'prod'");
console.log("\nUsage-nodedynamodb-scripts/import-devices-to-dynamodb.js{env}{csvpath/file}");
console.log("\nExample-nodedynamodb-scripts/import-devices-to-dynamodb.jsint./css_devices_only_csv.txt\n");
process.exit(1);
}
if(!csvFilePath){
console.log("\nMustpassincsvFilePathfor2ndargument.");
console.log("\nUsage-nodedynamodb-scripts/import-devices-to-dynamodb.js{env}{csvpath/file}");
console.log("\nExample-nodedynamodb-scripts/import-devices-to-dynamodb.jsint./css_devices_only_csv.txt\n");
process.exit(2);
}
console.log("Env="+env);
console.log("Filetoimport="+csvFilePath);
letcontent=fs.readFileSync(csvFilePath,config);
letparsed=babyparse.parse(content,config);
letrows=JSON.parse(JSON.stringify(parsed.data));
console.log("Rowcount="+Object.keys(rows).length);
let_id;
//Forthebatchsizeof10,weneedtotemporarilychangethewritecapacityunitsto50inDynaoDBfortheappropriatetable,thenresettodefaultwhenscriptisfinished
letsize=10;
console.log("dynamoDbURL="+dynamoDbUrl);
console.log("tablename="+tableName);
varcredentials=newAWS.SharedIniFileCredentials();
AWS.config.credentials=credentials;
constdynamoDb=newAWS.DynamoDB.DocumentClient();
letuniqueSerialNumbers=[];
for(leti=0;i<rows.length;i+=size){
//Slicethearrayintosmallerarraysof10,000items
letsmallarray=rows.slice(i,i+size);
console.log("i="+i+"serialNumber="+smallarray[0].serialNumber);
letbatchItems=smallarray.map(function(item){
try{
constserialNumber=item.serialNumber;
if(uniqueSerialNumbers.includes(serialNumber)){
//console.log("Systemignoreduplicatedrecord",item);
returnnull;
}else{
uniqueSerialNumbers.push(serialNumber);
}
//Replaceemptystringvalueswithnull.DynamoDBdoesn'tallowemptystrings,willthrowerroronrequest.
for(letitemsinitem){
letvalue=item[items];
if(value===undefined||value===""){
item[items]=null;
}
if(items=="enabled"){
if(value==="f"){
item[items]=false;
}elseif(value==="t"){
item[items]=true;
}
}
}
item.adminAccountUUID=null;
item.sessionID=null;
item.pairingCodeCreateTime=null;
if(item.systemName===null){
item.systemName=item.userExtension.toString()
}
if(item.pairingstatus==='DEFAULT'){
item.pairingstatus="COMPLETE"
}
if(item.plaform==='GRAPHITE'){
item.deviceUUID=item.serialNumber
}
if(item.userExtension&&!item.extension){
item.extension=item.userExtension.toString();
console.log(`++++++++++++++++++++++++++++++++++++`);
}
letparams={
PutRequest:{Item:JSON.parse(JSON.stringify(item))}
};
console.log("params="+JSON.stringify(params,null,2));
returnparams;
}
catch(error){
console.log("****ERRORprocessingfile:"+error);
}
}).filter((obj)=>
obj!==null
);
if(batchItems.length===0){
console.log("Systemfilterallthedupicatedata,nothingleft");
continue;
}
letbatchRequestParams='{"RequestItems":{"'+tableName+'":'+JSON.stringify(batchItems)+'},"ReturnConsumedCapacity":"TOTAL","ReturnItemCollectionMetrics":"SIZE"}';
console.log("batchRequestParams============================================================");//+batchRequestParams);
callDynamo(batchRequestParams).then(function(data){
sleep.msleep(100);
}).catch(console.error);
}
functioncallDynamo(batchRequestParams){
returnnewPromise(function(resolve,reject){
dynamoDb.batchWrite(JSON.parse(batchRequestParams),function(err,data){
try{
if(err){
logger.error(`Error-${err}=Tryingagain:`);
sleep.msleep(100);
dynamoDb.batchWrite(JSON.parse(batchRequestParams),function(err,data){
try{
if(err){
//console.log("-------------dataisbeauty:",batchRequestParams);
logger.error("Unabletoadditema2ndtime,Error:",err);
returnreject(err);
}
else{
logger.debug("2ndPutItemsucceeded");
resolve(data);
}
}
catch(error){
//console.log("-------------dataishere:",batchRequestParams);
console.log("errorcallingDynamoDB-"+error);
returnreject(err);
}
});
}
else{
logger.debug("PutItemsucceeded");
resolve(data);
}
}
catch(error){
console.log("errorcallingDynamoDB-"+error);
returnreject(err);
}
});
});
}
})(importDevicesToDynamo||(importDevicesToDynamo={}));
References:
https://github.com/audienceproject/spark-dynamodb
https://stackoverflow.com/questions/37444607/writing-from-spark-to-dynamodb
https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/SampleData.LoadData.html
https://github.com/lmammino/json-dynamo-putrequest