6

I asked a similar question a while ago, and thought I solved this problem, but it turned out that it went away simply because I was working on a smaller dataset.

Numerous people have asked this question and I have gone through every single internet post that I could find and still didn't make any progress.

What I'm trying to do is this: I have an external table browserdata in hive that refers to about 1 gigabyte of data. I try to stick that data into a partitioned table partbrowserdata, whose definition goes like this:

CREATE EXTERNAL TABLE IF NOT EXISTS partbrowserdata (                                                                                                                                                              
    BidID string,                                                                                                                                                                                                  
    Timestamp_ string,                                                                                                                                                                                             
    iPinYouID string,                                                                                                                                                                                              
    UserAgent string,                                                                                                                                                                                              
    IP string,                                                                                                                                                                                                     
    RegionID int,                                                                                                                                                                                                  
    AdExchange int,                                                                                                                                                                                                
    Domain string,                                                                                                                                                                                                 
    URL string,                                                                                                                                                                                                    
    AnonymousURL string,                                                                                                                                                                                           
    AdSlotID string,                                                                                                                                                                                               
    AdSlotWidth int,                                                                                                                                                                                               
    AdSlotHeight int,                                                                                                                                                                                              
    AdSlotVisibility string,                                                                                                                                                                                       
    AdSlotFormat string,                                                                                                                                                                                           
    AdSlotFloorPrice decimal,                                                                                                                                                                                      
    CreativeID string,                                                                                                                                                                                             
    BiddingPrice decimal,                                                                                                                                                                                          
    AdvertiserID string,                                                                                                                                                                                           
    UserProfileIDs array<string>                                                                                                                                                                                   
)                                                                                                                                                                                                                  
PARTITIONED BY (CityID int)                                                                                                                                                                                        
ROW FORMAT DELIMITED                                                                                                                                                                                               
FIELDS TERMINATED BY '\t'                                                                                                                                                                                          
STORED AS TEXTFILE                                                                                                                                                                                                 
LOCATION '/user/maria_dev/data2';

with this query:

insert into table partbrowserdata partition(cityid) 
select BidID,Timestamp_ ,iPinYouID ,UserAgent ,IP ,RegionID ,AdExchange ,Domain ,URL ,AnonymousURL ,AdSlotID ,AdSlotWidth ,AdSlotHeight ,AdSlotVisibility ,AdSlotFormat ,AdSlotFloorPrice ,CreativeID ,BiddingPrice ,AdvertiserID ,UserProfileIDs ,CityID 
from browserdata;

And every time, on every platform, be it hortonworks or cloudera, I get this message:

Caused by: 

org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /user/maria_dev/data2/.hive-staging_hive_2019-02-06_18-58-39_333_7627883726303986643-1/_task_tmp.-ext-10000/cityid=219/_tmp.000000_3 could only be replicated to 0 nodes instead of minReplication (=1).  There are 4 datanode(s) running and no node(s) are excluded in this operation.
        at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1720)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3389)
        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:683)
        at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.addBlock(AuthorizationProviderProxyClientProtocol.java:214)
        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:495)
        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2217)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2213)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2211)

        at org.apache.hadoop.ipc.Client.call(Client.java:1504)
        at org.apache.hadoop.ipc.Client.call(Client.java:1441)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
        at com.sun.proxy.$Proxy14.addBlock(Unknown Source)
        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:413)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:258)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
        at com.sun.proxy.$Proxy15.addBlock(Unknown Source)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1814)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1610)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:773)

What do I do? I can't understand why this is happening. It does seem like a memory issue though, because I am able to insert a few rows, but not all of them for some reason. Note that I have plenty of memory on HDFS, so 1 gig of extra data is pennies on a dollar, so it's probably a RAM issue?

Here's my dfs report output:

enter image description here

I have tried this on all execution engines: spark, tez, mr.

Please do not suggest solutions that say that I need to format the namenode, because they do not work, and they are not solutions in any way.

update:

After looking at logs for namenode I noticed this, if it helps:

Failed to place enough replicas, still in need of 1 to reach 1 (unavailableStorages=[DISK ], storagePolicy=BlockStoragePolicy{HOT:7, storageTypes=[DISK], creationFallbacks=[], replicationFallbacks=[ARCHIVE]}, newBlock=true) All required storage types are unavailable: unavailableStorages=[DISK], stor agePolicy=BlockStoragePolicy{HOT:7, storageTypes=[DISK], creationFallbacks=[], replicationFallbacks=[ARCHIVE]}

These logs suggest this:

For more information, please enable DEBUG log level on org.apache.hadoop.hdfs.ser ver.blockmanagement.BlockPlacementPolicy and org.apache.hadoop.net.NetworkTopology

How do I do that?

I also noticed a similar unresolved post on here:

HDP 2.2@Linux/CentOS@OracleVM (Hortonworks) fails on remote submission from Eclipse@Windows

update 2:

I just tried partitioning this with spark, and it works! So, this must be a hive bug...

update 3:

Just tested this on MapR and it worked, but MapR doesn't use HDFS. This is is definitely some sort of HDFS + Hive combination bug.

Proof:

enter image description here

pavel_orekhov
  • 1,657
  • 2
  • 15
  • 37
  • Did you try solution from this post (http://hadoopinrealworld.com/could-only-be-replicated-to-0-nodes/)? **Bottom line from this post**: "Your Datanodes could be live and healthy and the communication between the Namenode and Datanode(s) could be OK but if the Client which is writing to HDFS has trouble communicating with the Datanode then we will have a problem." – Jainik Feb 06 '19 at 20:53
  • @Jainik I have seen this post. Please notice that in my post I say that I am able to insert a small dataset, it doesn't work on large ones, so it is definitely not a connectivity issue. – pavel_orekhov Feb 06 '19 at 21:47
  • Set these properties before you execute your query: `mapred.compress.map.output=true mapred.output.compress=true hive.auto.convert.join=true hive.exec.parallel=true` – Jainik Feb 06 '19 at 22:38
  • @Jainik thanks, I will try these, but what if I don't wanna use mapreduce? What if I wanna run this on tez or spark? – pavel_orekhov Feb 06 '19 at 22:48
  • @Jainik I just tried setting those options before executing the query and unfortunately they did not work. – pavel_orekhov Feb 06 '19 at 22:56
  • You can use (https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties#ConfigurationProperties-Tez) to identify properties you need to use for TEZ, Spark properties can be found here (https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties#ConfigurationProperties-Spark) – Jainik Feb 06 '19 at 22:57
  • @Jainik it seems that these are mostly optimization options, however in my case I don't want to just optimize, I want to make it work even if the scale of my data grows. – pavel_orekhov Feb 06 '19 at 23:00
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/188024/discussion-between-jainik-and-hey-you). – Jainik Feb 06 '19 at 23:02
  • Comment on update 2: Can you try to insert in bucketed (non-partitioned) table in hive? – Jainik Feb 07 '19 at 21:59
  • @Jainik I will let you know when I did it. – pavel_orekhov Feb 07 '19 at 22:45
  • @Jainik hey, so, I had to reinstall hortonworks, because it stopped working for some reason, idk what the heck happened, but anyways, after I reinstalled it I was not able to write a table with spark and then use it from hive anymore for whatever reason. – pavel_orekhov Feb 08 '19 at 02:51
  • @Jainik I just tried it on MapR and it works without any problem! So this is definitely an HDFS problem, because MapR has a different file system. – pavel_orekhov Feb 08 '19 at 20:19
  • Interesting, I wonder what can be because this seems a normal operation so many people would have this issue by now – Jainik Feb 08 '19 at 20:27
  • @Jainik I know! But I've actually seen a few posts regarding this problem, but they did not have any solutions. Btw, I also asked this on hortonworks' forums: https://community.hortonworks.com/questions/238893/notenoughreplicasexception-when-writing-into-a-par.html – pavel_orekhov Feb 08 '19 at 20:31
  • @Jainik here's a screenshot from Hue on MapR for proof: https://pp.userapi.com/c846420/v846420081/19829c/lPDoZsPwtoU.jpg – pavel_orekhov Feb 08 '19 at 20:33
  • @Jainik I found the solution! – pavel_orekhov Feb 16 '19 at 04:05

1 Answers1

8

I ended up reaching out to cloudera forums and they answered my question in a matter of minutes: http://community.cloudera.com/t5/Storage-Random-Access-HDFS/Why-can-t-I-partition-a-1-gigabyte-dataset-into-300/m-p/86554#M3981 I tried what Harsh J suggests and it worked perfectly!

Here's what he said:

If you are dealing with unordered partitioning from a data source, you can end up creating a lot of files in parallel as the partitioning is attempted.

In HDFS, when a file (or more specifically, its block) is open, the DataNode performs a logical reservation of its target block size. So if your configured block size is 128 MiB, then every concurrently open block will deduct that value (logically) from the available remaining space the DataNode publishes to the NameNode.

This reservation is done to help manage space and guarantees of a full block write to a client, so that a client that's begun writing its file never runs into an out of space exception mid-way.

Note: When the file is closed, only the actual length is persisted, and the reservation calculation is adjusted to reflect the reality of used and available space. However, while the file block remains open, its always considered to be holding a full block size.

The NameNode further will only select a DataNode for a write if it can guarantee full target block size. It will ignore any DataNodes it deems (based on its reported values and metrics) unfit for the requested write's parameters. Your error shows that the NameNode has stopped considering your only live DataNode when trying to allocate a new block request.

As an example, 70 GiB of available space will prove insufficient if there will be more than 560 concurrent, open files (70 GiB divided into 128 MiB block sizes). So the DataNode will 'appear full' at the point of ~560 open files, and will no longer serve as a valid target for further file requests.

It appears per your description of the insert that this is likely, as each of the 300 chunks of the dataset may still carry varied IDs, resulting in a lot of open files requested per parallel task, for insert into several different partitions.

You could 'hack' your way around this by reducing the request block size within the query (set dfs.blocksize to 8 MiB for ex.), influencing the reservation calculation. However, this may not be a good idea for larger datasets as you scale, since it will drive up the file:block count and increase memory costs for the NameNode.

A better way to approach this would be to perform a pre-partitioned insert (sort first by partition and then insert in a partitioned manner). Hive for example provides this as an option: hive.optimize.sort.dynamic.partition, and if you use plain Spark or MapReduce then their default strategy of partitioning does exactly this.

So, at the end of the day I did set hive.optimize.sort.dynamic.partition=true; and everything started working. But I also did another thing.

Here's one of my posts from earlier as I was investigating this issue: Why do I get "File could only be replicated to 0 nodes" when writing to a partitioned table? I was running into a problem where hive couldn't partition my dataset, because hive.exec.max.dynamic.partitions was set to 100, so, I googled this issue and somewhere on hortonworks forums I saw an answer, saying that I should just do this:

SET hive.exec.max.dynamic.partitions=100000; 
SET hive.exec.max.dynamic.partitions.pernode=100000;

This was another problem, maybe hive tries to open as many of those concurrent connections as you set hive.exec.max.dynamic.partitions, so my insert query didn't start working until I decreased these values to 500.

pavel_orekhov
  • 1,657
  • 2
  • 15
  • 37