jax-rs jax-ws
有时,有必要通过JPA检索大型数据集(例如,超过1,000,000条记录),并将它们填充到java.util.List的单个实例中是有风险的(内存障碍)。 因此,这是一个快速的解决方案,它可以解决JAX-RS REST资源端点如何仍能通过“页面”对JPA实体进行流式处理或序列化而在不破坏内存约束的情况下及时提供响应。
示例数据库表和JPA实体
数据库表
为了演示如何实现大数据输出,这是我们可以使用的示例MySQL数据库表。
create database large_data_test_db;
use large_data_test_db;create table generated_uuids (record_no bigint not null auto_increment,uuid varchar(100) not null,datetime_generated datetime not null,primary key(record_no),unique(uuid)
);
JPA实体
接下来,定义代表上述表结构的JPA实体类。
GeneratedUuidEntity.java的代码
package com.developerscrappad;import java.io.Serializable;
import java.util.Date;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.NamedQueries;
import javax.persistence.NamedQuery;
import javax.persistence.Table;
import javax.persistence.Temporal;
import javax.persistence.TemporalType;@Entity
@Table( name = "generated_uuids" )
@NamedQueries( {@NamedQuery( name = "GeneratedUuidEntity.listAll", query = "SELECT u FROM GeneratedUuidEntity u" ),@NamedQuery( name = "GeneratedUuidEntity.queryRecordsSize", query = "SELECT count(u) FROM GeneratedUuidEntity u" )
} )
public class GeneratedUuidEntity implements Serializable {private static final long serialVersionUID = 12312312234234123L;@Id@GeneratedValue( strategy = GenerationType.IDENTITY )@Column( name = "record_no" )private Long recordNo;@Column( name = "uuid" )private String uuid;@Column( name = "datetime_generated" )@Temporal( TemporalType.TIMESTAMP )private Date datetimeGenerated;public GeneratedUuidEntity() {}public GeneratedUuidEntity( Long recordNo ) {this.recordNo = recordNo;}public GeneratedUuidEntity( Long recordNo, String uuid, Date datetimeGenerated ) {this.recordNo = recordNo;this.uuid = uuid;this.datetimeGenerated = datetimeGenerated;}public Long getRecordNo() {return recordNo;}public void setRecordNo( Long recordNo ) {this.recordNo = recordNo;}public String getUuid() {return uuid;}public void setUuid( String uuid ) {this.uuid = uuid;}public Date getDatetimeGenerated() {return datetimeGenerated;}public void setDatetimeGenerated( Date datetimeGenerated ) {this.datetimeGenerated = datetimeGenerated;}@Overridepublic int hashCode() {int hash = 0;hash += ( recordNo != null ? recordNo.hashCode() : 0 );return hash;}@Overridepublic boolean equals( Object object ) {// TODO: Warning - this method won't work in the case the id fields are not setif ( !( object instanceof GeneratedUuidEntity ) ) {return false;}GeneratedUuidEntity other = ( GeneratedUuidEntity ) object;if ( ( this.recordNo == null && other.recordNo != null ) || ( this.recordNo != null && !this.recordNo.equals( other.recordNo ) ) ) {return false;}return true;}@Overridepublic String toString() {return "com.developerscrappad.GeneratedUuidEntity[ recordNo=" + recordNo + " ]";}
}
在GeneratedUuidEntity中定义了两个命名查询。 GeneratedUuidEntity.queryRecordsSize用于查询表的总记录数,而GeneratedUuidEntity.listAll用于检索表中的所有记录。
实施JAX-RS REST资源(Java EE方式)
让我们有一个名称为JPAStreamingRESTResource的JAX-RS REST资源类,其中有一个可用的JPA EntityManager(持久性名称: JPAStreamingPU )要注入,并通过受保护的方法getEntityManager()获得 。
@Path( "generated-uuids" )
@Stateless( name = "JPAStreamingRESTResource", mappedName = "ejb/JPAStreamingRESTResource" )
public class JPAStreamingRESTResource {@PersistenceContext( unitName = "JPAStreamingPU" )private EntityManager entityManager;protected EntityManager getEntityManager() {return entityManager;}/*** Say "NO" to response caching*/protected Response.ResponseBuilder getNoCacheResponseBuilder( Response.Status status ) {CacheControl cc = new CacheControl();cc.setNoCache( true );cc.setMaxAge( -1 );cc.setMustRevalidate( true );return Response.status( status ).cacheControl( cc );}
}
此外,我们有一个名为getNoCacheResponseBuilder()的方法,该方法将获得一个非缓存的javax.ws.rs.core.Response.ResponseBuilder ,这样以后就不会再得到奇怪的缓存结果了。
JPA调用方法
接下来,让我们在资源类中定义两个方法,即:
queryGeneratedUuidRecordsSize() –检索表中的记录总数
private int queryGeneratedUuidRecordsSize() {return getEntityManager().createNamedQuery( "GeneratedUuidEntity.queryRecordsSize", Long.class ).getSingleResult().intValue();
}
listAllGeneratedUuidEntities() –从表中检索所有数据,但具有某些限制条件,例如记录的开始位置(recordPosition)和每次往返数据库的最大记录数(recordsPerRoundTrip)。 目的是“分页”结果,以使结果列表不会过分膨胀。 我们稍后会看到它的作用。
private List<GeneratedUuidEntity> listAllGeneratedUuidEntities( int recordPosition, int recordsPerRoundTrip ) {return getEntityManager().createNamedQuery( "GeneratedUuidEntity.listAll" ).setFirstResult( recordPosition ).setMaxResults( recordsPerRoundTrip ).getResultList();
}
让流开始
现在,让我们实现资源端点方法,至少从理论上讲,该方法可以在不损害大小的情况下检索数据。 此方法将返回JSON响应,其数据格式为:
{"result": [{"record_no": 1,"uuid": "34d99089-3e36-4f00-ab93-846b61771eb3","datetime_generated": "2015-06-28 21:02:23"},…]
}
@GET@Path( "list-all" )@Produces( "application/json" )@TransactionAttribute( TransactionAttributeType.NEVER )public Response streamGeneratedUuids() {// Define the format of timestamp outputSimpleDateFormat df = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss" );return getNoCacheResponseBuilder( Response.Status.OK ).entity( new StreamingOutput() {// Instruct how StreamingOutput's write method is to stream the data@Overridepublic void write( OutputStream os ) throws IOException, WebApplicationException {int recordsPerRoundTrip = 100; // Number of records for every round trip to the databaseint recordPosition = 0; // Initial record position indexint recordSize = queryGeneratedUuidRecordsSize(); // Total records found for the query// Start streaming the datatry ( PrintWriter writer = new PrintWriter( new BufferedWriter( new OutputStreamWriter( os ) ) ) ) {writer.print( "{\"result\": [" );while ( recordSize > 0 ) {// Get the paged data set from the DBList<GeneratedUuidEntity> generatedUuidEntities = listAllGeneratedUuidEntities( recordPosition, recordsPerRoundTrip );for ( GeneratedUuidEntity generatedUuidEntity : generatedUuidEntities ) {if ( recordPosition > 0 ) {writer.print( "," );}// Stream the data in Json object formatwriter.print( Json.createObjectBuilder().add( "record_no", generatedUuidEntity.getRecordNo() ).add( "uuid", generatedUuidEntity.getUuid() ).add( "datetime_generated", df.format( generatedUuidEntity.getDatetimeGenerated() ) ).build().toString() );// Increase the recordPosition for every record streamedrecordPosition++;}// update the recordSize (remaining no. of records)recordSize -= recordsPerRoundTrip;}// Done!writer.print( "]}" );}}} ).build();}
电源线说明:
实际上,这很简单。 诀窍是通过重写write()方法来定义匿名类StreamingOutput的表达式,该方法首先通过queryGeneratedUuidRecordsSize()查询总记录大小,然后通过listAllGeneratedUuidEntities()逐页检索记录。 此方法将多次访问数据库,具体取决于定义的recordsPerRoundTrip值。
JPAStreamingRESTResource.java的完整源代码:
package com.developerscrappad;import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.text.SimpleDateFormat;
import java.util.List;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import javax.json.Json;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.CacheControl;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;@Path( "generated-uuids" )
@Stateless( name = "JPAStreamingRESTResource", mappedName = "ejb/JPAStreamingRESTResource" )
public class JPAStreamingRESTResource {@PersistenceContext( unitName = "JPAStreamingPU" )private EntityManager entityManager;private List<GeneratedUuidEntity> listAllGeneratedUuidEntities( int recordPosition, int recordsPerRoundTrip ) {return getEntityManager().createNamedQuery( "GeneratedUuidEntity.listAll" ).setFirstResult( recordPosition ).setMaxResults( recordsPerRoundTrip ).getResultList();}private int queryGeneratedUuidRecordsSize() {return getEntityManager().createNamedQuery( "GeneratedUuidEntity.queryRecordsSize", Long.class ).getSingleResult().intValue();}protected EntityManager getEntityManager() {return entityManager;}/*** Say "NO" to response caching*/protected Response.ResponseBuilder getNoCacheResponseBuilder( Response.Status status ) {CacheControl cc = new CacheControl();cc.setNoCache( true );cc.setMaxAge( -1 );cc.setMustRevalidate( true );return Response.status( status ).cacheControl( cc );}@GET@Path( "list-all" )@Produces( "application/json" )@TransactionAttribute( TransactionAttributeType.NEVER )public Response streamGeneratedUuids() {// Define the format of timestamp outputSimpleDateFormat df = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss" );return getNoCacheResponseBuilder( Response.Status.OK ).entity( new StreamingOutput() {// Instruct how StreamingOutput's write method is to stream the data@Overridepublic void write( OutputStream os ) throws IOException, WebApplicationException {int recordsPerRoundTrip = 100; // Number of records for every round trip to the databaseint recordPosition = 0; // Initial record position indexint recordSize = queryGeneratedUuidRecordsSize(); // Total records found for the query// Start streaming the datatry ( PrintWriter writer = new PrintWriter( new BufferedWriter( new OutputStreamWriter( os ) ) ) ) {writer.print( "{\"result\": [" );while ( recordSize > 0 ) {// Get the paged data set from the DBList<GeneratedUuidEntity> generatedUuidEntities = listAllGeneratedUuidEntities( recordPosition, recordsPerRoundTrip );for ( GeneratedUuidEntity generatedUuidEntity : generatedUuidEntities ) {if ( recordPosition > 0 ) {writer.print( "," );}// Stream the data in Json object formatwriter.print( Json.createObjectBuilder().add( "record_no", generatedUuidEntity.getRecordNo() ).add( "uuid", generatedUuidEntity.getUuid() ).add( "datetime_generated", df.format( generatedUuidEntity.getDatetimeGenerated() ) ).build().toString() );// Increase the recordPosition for every record streamedrecordPosition++;}// update the recordSize (remaining no. of records)recordSize -= recordsPerRoundTrip;}// Done!writer.print( "]}" );}}} ).build();}
}
小心
请记住要调整应用程序服务器的响应连接超时值,以防止REST或Http Client抛出java.io.IOException过早的EOF异常。
测试它
要测试它是否有效,只需在表中加载567条记录即可。 然后,让单元测试调用端点URL,并使用以下单元测试代码将检索到的JSON数据保存到文件中(使用Apache HttpClient):
JPAStreamingUnitTest.java的代码:
package com.developerscrappad;import java.io.File;
import java.io.FileInputStream;
import static org.junit.Assert.*;import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.UUID;
import javax.json.Json;
import javax.json.JsonArray;
import javax.json.JsonObject;
import javax.json.JsonReader;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;public class JPAStreamingUnitTest {private static final String dbDriverClassname = "com.mysql.jdbc.Driver";private static final String dbUrl = "jdbc:mysql://localhost:3306/large_data_test_db";private static final String username = "username";private static final String password = "password";private static final int numberOfRecords = 567;private static final String jsonResultOutputFilename = "testing123.json";@BeforeClasspublic static void setUpClass() {try {Class.forName( dbDriverClassname );try ( Connection conn = DriverManager.getConnection( dbUrl, username, password ) ) {String insertSQL = "insert into generated_uuids (uuid, datetime_generated) values (?, now())";try ( PreparedStatement stmt = conn.prepareStatement( insertSQL ) ) {for ( int i = 0; i < numberOfRecords; i++ ) {System.out.println( "Inserting row: " + i );stmt.setString( 1, UUID.randomUUID().toString() );stmt.executeUpdate();}}}} catch ( final Exception ex ) {ex.printStackTrace();fail( ex.getMessage() );}}@AfterClasspublic static void tearDownClass() {try {Class.forName( dbDriverClassname );try ( Connection conn = DriverManager.getConnection( dbUrl, username, password ) ) {String truncateSQL = "truncate generated_uuids";conn.createStatement().executeUpdate( truncateSQL );}new File( System.getProperty( "java.io.tmpdir" ), jsonResultOutputFilename ).delete();} catch ( final Exception ex ) {ex.printStackTrace();fail( ex.getMessage() );}}@Testpublic void testJPAStreaming() {String url = "http://localhost:8080/JPAStreaming/rest-api/generated-uuids/list-all/";try {CloseableHttpClient httpclient = HttpClients.createDefault();HttpGet httpGet = new HttpGet( url );try ( CloseableHttpResponse response1 = httpclient.execute( httpGet ) ) {System.out.println( response1.getStatusLine() );HttpEntity entity1 = response1.getEntity();Files.copy( entity1.getContent(), FileSystems.getDefault().getPath( System.getProperty( "java.io.tmpdir" ), jsonResultOutputFilename ) );}// Validatetry ( JsonReader jsonReader = Json.createReader( new FileInputStream( new File( System.getProperty( "java.io.tmpdir" ), jsonResultOutputFilename ) ) ) ) {JsonObject jsonObj = jsonReader.readObject();assertTrue( jsonObj.containsKey( "result" ) );JsonArray jsonArray = jsonObj.getJsonArray( "result" );assertEquals( numberOfRecords, jsonArray.size() );SimpleDateFormat validationDF = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss" );for ( int i = 0; i < jsonArray.size(); i++ ) {JsonObject generatedUuidJsonObj = jsonArray.getJsonObject( i );int recordNumber = generatedUuidJsonObj.getInt( "record_no" );assertTrue( recordNumber > 0 );try {UUID.fromString( generatedUuidJsonObj.getString( "uuid" ) );} catch ( IllegalArgumentException ex ) {fail( "Invalid UUID format at record number: " + recordNumber );}try {validationDF.parse( generatedUuidJsonObj.getString( "datetime_generated" ) );} catch ( final NullPointerException | ParseException ex ) {fail( "datetime_generated field must not be null and must be of format yyyy-MM-dd HH:mm:ss" );}}}} catch ( final Exception ex ) {ex.printStackTrace();fail( ex.getMessage() );}}
}
我们完成了。 感谢您的阅读,希望对您有所帮助。
翻译自: https://www.javacodegeeks.com/2015/07/how-to-streamserialize-jpa-result-as-jax-rs-response-for-large-data.html
jax-rs jax-ws