1

I am using Scala to parse CSV files. Some of these files have fields which are non-textual data like images or octet-streams. I would like to use Apache Spark's textFile() method to split up the CSV into rows, and

split(",[ ]*(?=([^\"]*\"[^\"]*\")*[^\"]*$)")

to split the row into fields. Unfortunatly this does not work with files that have these mentioned binary fields. There are two problems: 1) The octet-streams can contain newlines which make textFile() split rows which should be one, and 2) The octet-streams contain commas and/or double quotes which are not escaped and mess up my schema.

The files are usually big, couple of MBs up to couple of 100MBs. I have to take the CSV's as they are, although I could preprocess them.

All I want to achieve is a working split function so I can ignore the field with the octet-stream. Nevertheless, a great bonus would be to extract the textual information in the octet-stream.

So how would I go forward to solve my problems?

Edit: A typical record obtained with cat, the newlines are from the file, not for cosmetic purposes (shortened):

7,url,user,02/24/2015 02:29:00 AM,03/22/2015 03:12:36 PM,octet-stream,27156,"MSCF^@^@^@^@�,^@^@^@^@^@^@D^@^@^@^@^@^@^@^C^A^A^@^C^@^D^@^@^@^@^@^T^@^@^@^@^@^P^@�,^@^@^X=^@^@^@^@^@^@^@^@^@^@�^@^@^@^E^@^A^@��^A^@^@^@^@^@^@^@WF6�!^@Info.txt^@=^B^@^@��^A^@^@^@WF7�^@^@List.xml^@^�^@^@��^A^@^@^@WF:�^@^@Filename.txt^@��>��
^@�CK�]�r��^Q��T�^O�^@�-�j�]��FI�Ky��Ei�Je^K""!�^Qx     @�*^U^?�^_�;��ħ�^LI^@$(�^Q���b��\N����t�����+������ȷgvM�^L̽�LǴL�^L��^ER��w^Ui^M��^X�Kޓ�^QJȧ��^N~��&�x�bB��D]1�^B|^G���g^SyG�����:����^_P�^T�^_�����U�|B�gH=��%Z^NY���,^U�^VI{��^S�^U�!�^Lpw�T���+�a�z�l������b����w^K��or��pH�   ��ܞ�l��z�^\i=�z�:^C�^S!_ESCW��ESC""��g^NY2��s�� u���X^?�^R^R+��b^]^Ro�r���^AR�h�^D��^X^M�^]ޫ���ܰ�^]���0^?��^]�92^GhCx�DN^?
mY<{��L^Zk�^\���M�^V^HE���-Ե�$f�f����^D�e�^R:�u����� ^E^A�Ȑ�^B�^E�sZ���Yo��8Eސ�}��&JY���^A9^P������^P����~Jʭy��`�^9«�""�U�      �:�}3���6�Hߧ�v���A7^Xi^L^]�sA�^Q�7�5d�^Xo˛�tY
Bp��4�Y���7DkV_���\^_q~�w�|�a�s̆���#�g�ӳu�^�!W}�n��Rgż_2�]�p�2}��b�G9�M^Q
�����:�X����bR[ԳZV!^G����^U�tq�&�Y6b��GR���s#mn6Z=^ZH^]�b��R^G�C�0R��{r1��4�#�
=r/X2�^O�����r^M�Rȕ�goG^X-����}���P+˥Qf�#��^C�Բ�z1�I�j����6�^Np���ܯ^P�[�^Tzԏ���^F2�e��\�E�߻6c�%���$�:E�*�*©t�y�J�,�S�2U�S�^X}ME�]��]�i��G�su�""��!�-��!r'ܷe_et Y^K^?0���l^A��^^�m�1/q����|�_r�5$�%�([x��W^E�G^^y���@����Z2^?ڠ�^_��^AҶ�OO��^]�vq%:j�^?�jX��\�]����^S�^^n�^C��>.^CY^O-� �_�\K����:p�<7Sֺnj���-Yk�r���^Q^M�n�J^B��^Z0^?�(^C��^W³!�g�Z�~R�A^M�^O^^�%;��Ԗ�p^S�w���*m^S���jڒ|�����<�^S�;Z^^Fc�1���^O�G_o����8��CS���w��^?��n�2~��m���G;��rx4�(�]�'��^E���eƧ�x��.�w�9WO�^^�י3��0,�y��H�Y�.H�x�""'���h}灢^T�Gm;^XE�̼�J��c�^^;�^A�qZ1ׁBZ^Q�^A^FB�^QbQ�_�3|ƺ�EvZ���^S�w���^P���9^MT��ǩY[+�+�9�Ԩ�^O�^Q���Fy(+�9p�^^Mj�2��Y^?��ڞ��^Ķb�^Z�ψMр}�ڣ�^^S�^?��^U�^Wڻ����z�^@��uk��k^^�>^O�^W�ݤO�h�^G�����Kˇ�.�R|�)-��e^G�^]�/J����U�ϴ�a���i5HO�^L�ESCg�R'���.����d���+~�}��ڝ^Y5]l�3jg54M�������2t�5^Y}�q)��^O;�X\�q^Ox~Vۗ�t�^\f�       >k;^G�K5��,��X�t/�ǧ^G""5��4^MiΟ�n��^B^]�|�����V��ߌ֗Q~�H���8��t��5��ܗ�
�Z�^c�6N�ESCG����^_��>��t^L^R�^:�x���^]v�{^@+KM��qԎ�.^S�%&��=^W-�=�^S�����^CI���&^]_�s�˞�y�z�Jc^W�kڠ�^\��^]j�����^O��;�oY^^�^V59;�c��^B��T�nb����^C��^N��s�x�<{�9-�F�T�^N�5�^Se-���^T�Y[���`^ZsL��v�բ<C�+�~�^ۚ��""�Yκ2^_�^VxT�>��/ݳ^U�m�^@���3^Ge�n^Vc�V�^@�NVn�,�q��^^^]gy�R�S��Ȃ$���>A�d����xg�^GB3�M�J�^QJ^]�^\�{.�D��碎�^W�8a����qޠl?,'^R�^X�Cgy�P[����mڞ��H�Z�s�SD&蠤�s�E��nu�O@O<��3wj`C-%w�W�J�^WP^T�^]r^NT�TC�Lq�Z�f�!�;�l�Y��Gb��>�ud�hx�Ԭ^N)9�^N!k�҉s�35v������.�""^]��~4������۴�Z^]u�^Ti^^�i:�)K��P᳕!�@�^?�>��EE^VE-u�^SgV^L��<��^D�O<�+�J.�c�Z#>�.l����^S� 
ESC��(��E�j�π쬖���2{^U&b\��P^S�`^O^XdL�^ 6bu��FD��^@^@^@^@","field_x, data",field_y,field_z

Expected output would be an array

("7","url","user","02/24/2015 02:29:00 AM","03/22/2015 03:12:36 PM","octet-stream","27156","field_x, data",field_y",field_z")

Or, but this is probably another question, such an array (like running strings on the octet-stream field):

("7","url","user","02/24/2015 02:29:00 AM","03/22/2015 03:12:36 PM","octet-stream","27156","Info.txt List.xml Filename.txt","field_x, data",field_y",field_z")

Edit 2: Every file that has a binary field also contains a length field for it. So instead of splitting directly I can walk left to right through my record and extract the fields. This is certainly a great improvement of my current situation but problem 1) still persists. How can I split those files reliably?

I took a closer look at the files and a header looks like this:

RecordId, Field_A, Content_Type, Content_Length, Content, Field_B

(Where Content_Type can be "octet-stream", Content_Length the number of bytes in the Content field, and Content obviously the data). And good for me, the value of Field_B is predictable, let's assume for a certain file it's always "Hello World".

So instead of using Spark's default behaviour splitting on newlines, how can I achieve that Spark is only splitting on newlines following "Hello World"? (I also edited the question title since the focus of the question changed)

flowit
  • 1,382
  • 1
  • 10
  • 36
  • You can start with providing example data and expected output. Some details like size of the individual files could be useful to. If you cannot figure out what is the problem when you have an access to a data how can we? – zero323 Nov 18 '15 at 11:05
  • What is "27156"? A length of stream maybe? – Nyavro Nov 18 '15 at 11:35
  • Yes it is. I could read the whole file from the beginning until I reach that number and read the next x bytes as the octet-stream, but I would give up the parallelism spark offers me. – flowit Nov 18 '15 at 11:37
  • Are you confident that the .csv can actually be parsed? If so, one of the following conditions would seem to necessarily prevail: 1) all delimiters within the octet stream have been escaped (e.g., each are preceded by a backslash, or other escape character), or 2) the octet stream has a header field that specifies it's length (perhaps the preceding column, 27156, as Nyavro suggested). – philwalk Nov 18 '15 at 15:44
  • Although option 2) would require some metadata to specify column types (perhaps the first row contains column names with implicit column types ?) – philwalk Nov 18 '15 at 15:50
  • Please see Edit 2, thanks so far, this really helped a lot. – flowit Nov 18 '15 at 20:32

2 Answers2

0

As answered in Spark: Reading files using different delimiter than new line, I used textinputformat.record.delimiter to split on "Hello World\n" because I am a bit lucky that the last column always contains the same value. After that I simply walk left to right through the record and when I reach the length field I skip the next n bytes. Everything works now. Thanks for pointing me in the right direction.

Community
  • 1
  • 1
flowit
  • 1,382
  • 1
  • 10
  • 36
0

There are two problems: 1) The octet-streams can contain newlines which make textFile() split rows which should be one, and 2) The octet-streams contain commas and/or double quotes which are not escaped and mess up my schema.

Well, actually that csv file is properly escaped:

  • the multiline field is enclosed in double quotes: "MSCF^@ .. ^@^@" (which also handles possible separators inside the field)
  • double quotes inside the field are escaped with another double quote as it should be: Je^K""!

Of course a simple split will not work in this case (and should never be used on csv data), but any csv reader able to handle multiline fields should parse that data correctly.

Also keep in mind that the double quotes inside the octet-stream have to be unescaped, or that data won't be valid (another reason not to use split, but a csv reader that handles this).

Danny_ds
  • 11,201
  • 1
  • 24
  • 46