DBInputFormat за прехвърляне на данни от SQL в база данни NoSQL



Целта на този блог е да научи как да прехвърляте данни от SQL бази данни в HDFS, как да прехвърляте данни от SQL бази данни в NoSQL бази данни.

В този блог ще изследваме възможностите и възможностите на един от най-важните компоненти на технологията Hadoop, т.е. MapReduce.

Днес компаниите възприемат Hadoop framework като своя първи избор за съхранение на данни, поради способностите му да обработва ефективно големи данни. Но също така знаем, че данните са универсални и съществуват в различни структури и формати. За да се контролира такова огромно разнообразие от данни и различните им формати, трябва да има механизъм, който да побере всички разновидности и все пак да доведе до ефективен и последователен резултат.





Най-мощният компонент в рамката на Hadoop е MapReduce, който може да осигури контрола върху данните и тяхната структура по-добре от останалите аналози. Въпреки че изисква допълнителни криви на обучение и сложността на програмирането, ако можете да се справите с тези сложности, със сигурност можете да обработвате всякакъв вид данни с Hadoop.

Рамката MapReduce разбива всички свои задачи по обработка на две фази: Map и Reduce.



Подготовката на вашите сурови данни за тези фази изисква разбиране на някои основни класове и интерфейси. Супер класът за тези преработки е InputFormat.

The InputFormat class е един от основните класове в API на Hadoop MapReduce. Този клас е отговорен за дефинирането на две основни неща:

  • Данните се разделят
  • Четец на записи

Разделяне на данните е основна концепция в рамката на Hadoop MapReduce, която определя както размера на отделните задачи на картата, така и потенциалния й сървър за изпълнение. The Четец на записи отговаря за действителните записи за четене от входния файл и ги изпраща (като двойки ключ / стойност) на mapper.



Броят на картографиращите се определя въз основа на броя разделяния. Работата на InputFormat е да създава разделянията. По-голямата част от времето за разделяне е еквивалентно на размера на блока, но не винаги се създават разделения въз основа на размера на блока HDFS. Това напълно зависи от това как методът getSplits () на вашия InputFormat е заменен.

Има фундаментална разлика между MR split и HDFS block. Блокът е физически парче от данни, докато разделянето е просто логически парче, което картограф чете. Сплитът не съдържа входните данни, той просто съдържа референция или адрес на данните. Разделението основно има две неща: дължина в байтове и набор от места за съхранение, които са просто низове.

За да разберем това по-добре, нека вземем един пример: Обработка на данни, съхранявани във вашия MySQL с помощта на MR. Тъй като в този случай няма концепция за блокове, теорията: „разделянето винаги се създава въз основа на HDFS блока“,се проваля. Една от възможностите е да се създадат разделяния въз основа на диапазони от редове във вашата MySQL таблица (и това прави DBInputFormat, входен формат за четене на данни от релационни бази данни). Може да имаме k брой разделения, състоящи се от n реда.

Само за InputFormats, базирани на FileInputFormat (InputFormat за обработка на данни, съхранявани във файлове), разделянията се създават въз основа на общия размер, в байтове, на входните файлове. Размерът на блоковете на FileSystem на входните файлове обаче се третира като горна граница за входни разделяния. Ако имате файл, по-малък от размера на блока HDFS, ще получите само 1 картографиране за този файл. Ако искате да имате различно поведение, можете да използвате mapred.min.split.size. Но това отново зависи единствено от getSplits () на вашия InputFormat.

Имаме толкова много съществуващи входни формати, налични в пакета org.apache.hadoop.mapreduce.lib.input.

CombineFileInputFormat.html

CombineFileRecordReader.html

CombineFileRecordReaderWrapper.html

CombineFileSplit.html

CombineSequenceFileInputFormat.html

CombineTextInputFormat.html

c ++ сортиране ()

FileInputFormat.html

FileInputFormatCounter.html

FileSplit.html

FixedLengthInputFormat.html

InvalidInputException.html

KeyValueLineRecordReader.html

KeyValueTextInputFormat.html

MultipleInputs.html

NLineInputFormat.html

SequenceFileAsBinaryInputFormat.html

SequenceFileAsTextInputFormat.html

SequenceFileAsTextRecordReader.html

SequenceFileInputFilter.html

SequenceFileInputFormat.html

SequenceFileRecordReader.html

какво е hover в css

TextInputFormat.html

По подразбиране е TextInputFormat.

По същия начин имаме толкова много изходни формати, които четат данните от редуктори и ги съхраняват в HDFS:

FileOutputCommitter.html

FileOutputFormat.html

FileOutputFormatCounter.html

FilterOutputFormat.html

LazyOutputFormat.html

MapFileOutputFormat.html

MultipleOutputs.html

NullOutputFormat.html

PartialFileOutputCommitter.html

PartialOutputCommitter.html

SequenceFileAsBinaryOutputFormat.html

SequenceFileOutputFormat.html

TextOutputFormat.html

По подразбиране е TextOutputFormat.

Докато приключите с четенето на този блог, ще сте научили:

  • Как да напиша програма за намаляване на картата
  • За различните видове InputFormats, налични в Mapreduce
  • Каква е необходимостта от InputFormats
  • Как да напиша потребителски InputFormats
  • Как да прехвърля данни от SQL бази данни в HDFS
  • Как да прехвърля данни от SQL (тук MySQL) бази данни в NoSQL бази данни (тук Hbase)
  • Как да прехвърля данни от една SQL база данни в друга таблица в SQL бази данни (Може би това не е толкова важно, ако правим това в една и съща база данни на SQL. Въпреки това, няма нищо лошо в познаването на същата. Никога не знаете как може да влезе в употреба)

Предпоставка:

  • Предварително инсталиран Hadoop
  • SQL предварително инсталиран
  • Предварително инсталиран Hbase
  • Основно разбиране за Java
  • Map Намалете знанията
  • Основни познания на Hadoop framework

Нека разберем декларацията за проблема, която ще решим тук:

Имаме таблица на служителите в MySQL DB в нашата релационна база данни Edureka. Сега според бизнес изискването трябва да прехвърлим всички данни, налични в релационната DB, във файловата система Hadoop, т.е. HDFS, NoSQL DB, известна като Hbase.

Имаме много възможности за изпълнение на тази задача:

  • Sqoop
  • Флум
  • MapReduce

Сега не искате да инсталирате и конфигурирате друг инструмент за тази операция. Остава ви само една опция, която е рамката за обработка на Hadoop MapReduce. Рамката MapReduce ще ви даде пълен контрол върху данните при прехвърляне. Можете да манипулирате колоните и да поставяте директно във всяко от двете целеви места.

Забележка:

java какви са променливите на екземпляра
  • Трябва да изтеглим и поставим конектора MySQL в пътя на класа на Hadoop, за да извлечем таблици от таблицата MySQL. За целта изтеглете конектора com.mysql.jdbc_5.1.5.jar и го запазете в директорията Hadoop_home / share / Hadoop / MaPreduce / lib.
cp Downloads / com.mysql.jdbc_5.1.5.jar $ HADOOP_HOME / share / hadoop / mapreduce / lib /
  • Също така поставете всички буркани Hbase под Hadoop classpath, за да накарате вашата програма за MR да има достъп до Hbase. За да направите това, изпълнете следната команда :
cp $ HBASE_HOME / lib / * $ HADOOP_HOME / share / hadoop / mapreduce / lib /

Версиите на софтуера, които използвах при изпълнението на тази задача, са:

  • Hadooop-2.3.0
  • HBase 0.98.9-Hadoop2
  • Затъмнение Луна

За да избегна програмата при проблем със съвместимостта, предписвам на читателите си да изпълняват командата с подобна среда.

Персонализиран DBInputWritable:

пакет com.inputFormat.copy import java.io.DataInput import java.io.DataOutput import java.io.IOException import java.sql.ResultSet import java.sql.PreparedStatement import java.sql.SQLException import org.apache.hadoop.iodo.io .Writable import org.apache.hadoop.mapreduce.lib.db.DBWritable public class DBInputWritable implements Writable, DBWritable {private int id private String name, dept public void readFields (DataInput in) хвърля IOException {} public void readFields (ResultSet) хвърля SQLException // Resultset обект представлява данните, върнати от SQL израз {id = rs.getInt (1) name = rs.getString (2) dept = rs.getString (3)} public void write (DataOutput out) хвърля IOException { } public void write (PreparedStatement ps) хвърля SQLException {ps.setInt (1, id) ps.setString (2, name) ps.setString (3, dept)} public int getId () {return id} public String getName () {return name} публичен низ getDept () {return dept}}

Персонализиран DBOutputWritable:

пакет com.inputFormat.copy import java.io.DataInput import java.io.DataOutput import java.io.IOException import java.sql.ResultSet import java.sql.PreparedStatement import java.sql.SQLException import org.apache.hadoop.iodo.io .Writable import org.apache.hadoop.mapreduce.lib.db.DBWritable public class DBOutputWritable implements Writable, DBWritable {private String name private int id private String dept public DBOutputWritable (String name, int id, String dept) {this.name = име this.id = id this.dept = dept} public void readFields (DataInput in) хвърля IOException {} public void readFields (ResultSet rs) хвърля SQLException {} public void write (DataOutput out) хвърля IOException {} public void write (PreparedStatement ps) хвърля SQLException {ps.setString (1, име) ps.setInt (2, id) ps.setString (3, dept)}}

Таблица за въвеждане:

създаване на база данни
създаване на таблица emp (empid int не null, име varchar (30), dept varchar (20), първичен ключ (empid))
вмъкнете в стойности на emp (1, 'abhay', 'developement'), (2, 'brundesh', 'test')
изберете * от emp

Случай 1: Прехвърляне от MySQL към HDFS

пакет com.inputFormat.copy import java.net.URI import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce .Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib.db.DBInputFormat import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop .io.Text import org.apache.hadoop.io.IntWritable публичен клас MainDbtohdfs {public static void main (String [] args) хвърля изключение {Configuration conf = new Configuration () DBConfiguration.configureDB (conf, 'com.mysql.jdbc .Driver ', // клас на драйвер' jdbc: mysql: // localhost: 3306 / edureka ', // db url' root ', // потребителско име' root ') // парола Задание = нова работа (conf) .setJarByClass (MainDbtohdfs.class) job.setMapperClass (Map.class) job.setMapOutputKeyClass (Text.class) job.setMapOutputValueClass (IntWritable.class) job.setInputFormatClass (DBInputFormat.putmat. нов път (аргументи [0])) DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // име на входна таблица null, null, new String [] {'empid', 'name', 'dept'} / / таблица колони) Път p = нов път (аргументи [0]) FileSystem fs = FileSystem.get (нов URI (аргументи [0]), conf) fs.delete (p) System.exit (job.waitForCompletion (true)? 0: 1)}}

Тази част от кода ни позволява да подготвим или конфигурираме входния формат за достъп до нашия източник на SQL DB. Параметърът включва класа на драйвера, URL адресът има адреса на базата данни на SQL, потребителското му име и паролата.

DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // клас на драйвер 'jdbc: mysql: // localhost: 3306 / edureka', // db url 'root', // потребителско име 'root') // парола

Тази част от кода ни позволява да предадем подробностите за таблиците в базата данни и да ги зададем в обекта на заданието. Параметрите включват, разбира се, екземпляра на заданието, потребителския клас за записване, който трябва да реализира DBWritable интерфейс, името на таблицата източник, условието, ако има друго null, параметрите за сортиране else null, съответно списъка с колони в таблицата.

DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // име на въведена таблица null, null, new String [] {'empid', 'name', 'dept'} // колони на таблица)

Картограф

пакет com.inputFormat.copy import java.io.IOException import org.apache.hadoop.mapreduce.Mapper import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.io .IntWritable Public class Map разширява Mapper {
защитена void карта (LongWritable ключ, DBInputWritable стойност, Context ctx) {try {String name = value.getName () IntWritable id = new IntWritable (value.getId ()) String dept = value.getDept ()
ctx.write (нов текст (име + '+ id +' + dept), id)
} catch (IOException e) {e.printStackTrace ()} catch (InterruptedException e) {e.printStackTrace ()}}}

Редуктор: Използва се редуктор на идентичност

Команда за изпълнение:

hadoop jar dbhdfs.jar com.inputFormat.copy.MainDbtohdfs / dbtohdfs

Изход: Таблица MySQL прехвърлена в HDFS

hadoop dfs -ls / dbtohdfs / *

Случай 2: Прехвърляне от една таблица в MySQL на друга в MySQL

създаване на изходна таблица в MySQL

създаване на таблица служител1 (име varchar (20), id int, dept varchar (20))

пакет com.inputFormat.copy import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib .db.DBInputFormat import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable import org.apache.hadoop.io.NullWritable public class Mainonetable_to_other_table {public static void main (String [] args) хвърля изключение {Configuration conf = new Configuration () DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // class class 'jdbc: mysql: // localhost : 3306 / edureka ', // db url' root ', // потребителско име' root ') // парола Job job = new Job (conf) job.setJarByClass (Mainonetable_to_other_table.class) job.setMapperClass (Map.class) job .setReducerClass (Намаляване.клас) job.setMapOutputKeyClass (Текст.клас) job.setMapOutputValueClass (IntWritable.class) job.setOutputKeyClass (DBOutputWritable.class) job.setOutputValueClass (NET). lWritable.class) job.setInputFormatClass (DBInputFormat.class) job.setOutputFormatClass (DBOutputFormat.class) DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // име на таблица за въвеждане null, null, new String [] {'empid ',' name ',' dept '} // колони на таблица) DBOutputFormat.setOutput (job,' worker1 ', // output table name new String [] {' name ',' id ',' dept '} // таблица колони) System.exit (job.waitForCompletion (вярно)? 0: 1)}}

Тази част от кода ни позволява да конфигурираме името на изходната таблица в SQL DB. Параметрите са съответно екземпляр на работа, име на изходна таблица и имена на изходни колони.

DBOutputFormat.setOutput (job, 'worker1', // изходно име на таблица new String [] {'name', 'id', 'dept'} // колони на таблица)

Mapper: Същото като в случай 1

Редуктор:

пакет com.inputFormat.copy import java.io.IOException import org.apache.hadoop.mapreduce.Reducer import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable import org.apache.hadoop.io .NullWritable публичен клас Намалява разширява Редуктор {защитена невалидна редукция (Текстов ключ, Итерируеми стойности, Контекст ctx) {int sum = 0 String line [] = key.toString (). Split ('') try {ctx.write (new DBOutputWritable (ред [0] .toString (), Integer.parseInt (ред [1] .toString ()), ред [2] .toString ()), NullWritable.get ())} catch (IOException e) {e.printStackTrace ()} catch (InterruptedException e) {e.printStackTrace ()}}}

Команда за изпълнение:

hadoop jar dbhdfs.jar com.inputFormat.copy.Mainonetable_to_other_table

Резултат: Прехвърлени данни от EMP таблица в MySQL на друг служител на таблица1 в MySQL

Случай 3: Прехвърляне от таблица в MySQL към таблица NoSQL (Hbase)

Създаване на таблица Hbase за побиране на изхода от SQL таблицата:

създайте 'служител', 'official_info'

Клас на водача:

пакет Dbtohbase import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib.db.DBInputFormat import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.HTable import org.apache.hadoop.hbase.client.HTableInterface import org.apache .hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil import org.apache.hadoop.io.Text публичен клас MainDbToHbase {public static void main (String [] args) хвърля изключение {Configuration conf = config. HBaseConfiguration.create () HTableInterface mytable = new HTable (conf, 'emp') DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // клас на драйвер 'jdbc: mysql: // localhost: 3306 / edureka' , // db url 'root', // потребителско име 'root') // парола Job job = new Job (conf, 'dbtohbase') job.setJarByClass (MainDbToHbase.class) job.s etMapperClass (Map.class) job.setMapOutputKeyClass (ImmutableBytesWritable.class) job.setMapOutputValueClass (Text.class) TableMapReduceUtil.initTableReducerJob ('служител', Reduce.class. клас) DBInputFormat.setInput (работа, DBInputWritable.class, 'emp', // име на таблица за въвеждане null, null, нов String [] {'empid', 'name', 'dept'} // колони на таблица) System.exit (job.waitForCompletion (вярно)? 0: 1)}}

Тази част от кода ви позволява да конфигурирате класа на изходния ключ, който в случай на hbase е ImmutableBytesWritable

job.setMapOutputKeyClass (ImmutableBytesWritable.class) job.setMapOutputValueClass (Text.class)

Тук предаваме името на таблицата hbase и редуктора, за да действа върху таблицата.

TableMapReduceUtil.initTableReducerJob ('служител', Reduce.class, работа)

Картограф:

пакет Dbtohbase import java.io.IOException import org.apache.hadoop.mapreduce.Mapper import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.io .LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable публичен клас Map extends Mapper {private IntWritable one = new IntWritable (1) protected void map (LongWritable id, DBInputWritable value, Context context) {try {String line = value.getName () String cd = value.getId () + '' String dept = value.getDept () context.write (new ImmutableBytesWritable (Bytes.toBytes (cd)), new Text (line + ' '+ dept))} catch (IOException e) {e.printStackTrace ()} catch (InterruptedException e) {e.printStackTrace ()}}}

В тази част от кода вземаме стойности от гетерите на класа DBinputwritable и след това ги предаваме
ImmutableBytesWritable, така че те да достигнат до редуктора в байтабелна форма, която Hbase разбира.

String line = value.getName () String cd = value.getId () + 'String dept = value.getDept () context.write (new ImmutableBytesWritable (Bytes.toBytes (cd)), new Text (line +' '+ dept ))

Редуктор:

пакет Dbtohbase import java.io.IOException import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableReducer import org.apache.hadoop .hbase.util.Bytes import org.apache.hadoop.io.Text публичен клас Намалява разширява TableReducer {public void reduce (ImmutableBytesWritable key, Iterable values, Context context) хвърля IOException, InterruptedException {String [] причина = null // Loop стойности за (Текст val: стойности) {причина = val.toString (). split ('')} // Поставяне в HBase Поставяне put = ново Поставяне (key.get ()) put.add (Bytes.toBytes ('official_info') ), Bytes.toBytes ('име'), Bytes.toBytes (причина [0])) put.add (Bytes.toBytes ('official_info'), Bytes.toBytes ('отдел'), Bytes.toBytes (причина [1 ])) context.write (ключ, поставяне)}}

Тази част от кода ни позволява да решим точния ред и колоната, в която ще съхраняваме стойности от редуктора. Тук съхраняваме всеки empid в отделен ред, както направихме empid като ключ на ред, който би бил уникален. Във всеки ред съхраняваме официалната информация на служителите под семейство колони „official_info“ съответно под колони „име“ и „отдел“.

Put put = new Put (key.get ()) put.add (Bytes.toBytes ('official_info'), Bytes.toBytes ('name'), Bytes.toBytes (причина [0])) put.add (Bytes. toBytes ('official_info'), Bytes.toBytes ('отдел'), Bytes.toBytes (причина [1])) context.write (ключ, пуснат)

Прехвърлени данни в Hbase:

сканирайте служител

Както виждаме, успяхме да изпълним успешно задачата за мигриране на нашите бизнес данни от релационна SQL DB в NoSQL DB.

В следващия блог ще научим как да пишете и изпълнявате кодове за други входни и изходни формати.

Продължавайте да публикувате вашите коментари, въпроси или всякакви отзиви. Бих се радвал да чуя от вас.

Имате въпрос към нас? Моля, споменете го в раздела за коментари и ние ще се свържем с вас.

Подобни публикации: